diff --git a/src/recallforge/search.py b/src/recallforge/search.py index 6caf39b..bce3ad5 100644 --- a/src/recallforge/search.py +++ b/src/recallforge/search.py @@ -15,12 +15,13 @@ """ import concurrent.futures +import copy import json import logging import os import re import time -from dataclasses import dataclass, field +from dataclasses import dataclass, field, replace from hashlib import sha256 from typing import List, Dict, Any, Optional, Union @@ -99,6 +100,20 @@ def _log_stage_metrics( logger.debug("stage_metrics " + " ".join(log_parts)) +def _canonicalize_memory_result_paths(result: "HybridResult", canonical_root_path: Optional[str]) -> tuple[str, str]: + """Return a stable result filepath plus display path for rolled memory hits.""" + if not canonical_root_path: + return result.filepath, result.display_path + + if str(result.filepath or "").startswith("recallforge://"): + return ( + f"recallforge://{result.collection}/{canonical_root_path}", + f"{result.collection}/{canonical_root_path}", + ) + + return canonical_root_path, canonical_root_path + + # Intent-to-weight mappings for RRF fusion # Each intent maps source names to weight multipliers INTENT_WEIGHTS: Dict[str, Dict[str, float]] = { @@ -132,6 +147,8 @@ class SearchAudit: blend_weights: Dict[str, float] = field(default_factory=dict) # rrf_weight, rerank_weight media_compensation_applied: bool = False # Whether media boost was applied in RRF memory_rollup_boost: float = 1.0 # Multiplier applied when sibling assets are rolled up + memory_primary_evidence_path: Optional[str] = None + memory_supporting_paths: List[str] = field(default_factory=list) final_blended_score: float = 0.0 @@ -157,6 +174,8 @@ class HybridResult: memory_role: str = "root" memory_root_path: Optional[str] = None memory_hit_count: int = 1 + memory_primary_evidence_path: Optional[str] = None + memory_supporting_paths: Optional[List[str]] = None tags: Optional[List[str]] = None audit: Optional[SearchAudit] = None # Per-result audit trail @@ -1239,15 +1258,57 @@ def _merge_tags(items: List[HybridResult]) -> Optional[List[str]]: rolled: List[HybridResult] = [] for key in order: group = sorted(grouped[key], key=lambda item: item.score, reverse=True) - representative = group[0] + top_hit = group[0] + root_candidate = next( + (item for item in group if item.memory_role == "root"), + None, + ) + representative = replace(root_candidate or top_hit) + representative.score = top_hit.score + representative.rrf_rank = top_hit.rrf_rank + representative.rerank_score = top_hit.rerank_score + representative.source = top_hit.source + representative.audit = copy.deepcopy(top_hit.audit) if top_hit.audit else None + representative.context = representative.context or top_hit.context + representative.body = representative.body or top_hit.body + representative.hash = representative.hash or top_hit.hash + representative.docid = representative.docid or top_hit.docid + representative.modified_at = representative.modified_at or top_hit.modified_at + representative.body_length = representative.body_length or top_hit.body_length + + canonical_path = representative.memory_root_path or top_hit.memory_root_path + if canonical_path: + representative.filepath, representative.display_path = _canonicalize_memory_result_paths( + representative, + canonical_path, + ) + if not root_candidate: + representative.title = os.path.basename(canonical_path) + representative.memory_root_path = canonical_path + else: + representative.memory_root_path = representative.filepath + + representative.memory_role = "root" representative.memory_hit_count = len(group) representative.tags = _merge_tags(group) + representative.memory_primary_evidence_path = top_hit.filepath + representative.memory_supporting_paths = [ + item.filepath + for item in group + if item.filepath not in {representative.filepath, top_hit.filepath} + ][:5] memory_rollup_boost = 1.0 if len(group) > 1: memory_rollup_boost += min(0.15, 0.03 * (len(group) - 1)) representative.score *= memory_rollup_boost if representative.audit: + representative.audit.filepath = representative.filepath + representative.audit.content_type = representative.content_type representative.audit.memory_rollup_boost = memory_rollup_boost + representative.audit.memory_primary_evidence_path = top_hit.filepath + representative.audit.memory_supporting_paths = list( + representative.memory_supporting_paths or [] + ) representative.audit.final_blended_score = representative.score rolled.append(representative) @@ -1570,7 +1631,7 @@ def run_single_query(q: BatchQuery) -> List[tuple]: return [(r, r.score) for r in results] # Run all queries in parallel - all_results: Dict[int, List[tuple]] = {} + all_results: List[List[tuple]] = [[] for _ in batch_queries] with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_idx = { executor.submit(run_single_query, q): i @@ -1587,7 +1648,7 @@ def run_single_query(q: BatchQuery) -> List[tuple]: # Merge results using RRF with best-score-wins merged: Dict[str, Dict[str, Any]] = {} - for idx, results in all_results.items(): + for idx, results in enumerate(all_results): weight = batch_queries[idx].weight for rank, (result, score) in enumerate(results): filepath = result.filepath diff --git a/src/recallforge/server.py b/src/recallforge/server.py index 6b4d380..5003741 100644 --- a/src/recallforge/server.py +++ b/src/recallforge/server.py @@ -1007,6 +1007,8 @@ async def _handle_explain_results(arguments: dict, backend, storage) -> list[Tex "memory_role": getattr(r, "memory_role", "root"), "memory_root_path": getattr(r, "memory_root_path", None), "memory_hit_count": getattr(r, "memory_hit_count", 1), + "memory_primary_evidence_path": getattr(r, "memory_primary_evidence_path", None), + "memory_supporting_paths": getattr(r, "memory_supporting_paths", None), "tags": getattr(r, "tags", None), } @@ -1029,6 +1031,8 @@ async def _handle_explain_results(arguments: dict, backend, storage) -> list[Tex "memory_rollup": { "memory_hit_count": getattr(r, "memory_hit_count", 1), "boost": round(r.audit.memory_rollup_boost, 6), + "primary_evidence_path": r.audit.memory_primary_evidence_path, + "supporting_paths": list(r.audit.memory_supporting_paths), }, } else: diff --git a/src/recallforge/storage/indexing_ops.py b/src/recallforge/storage/indexing_ops.py index 5c23bd6..c5bb1cb 100644 --- a/src/recallforge/storage/indexing_ops.py +++ b/src/recallforge/storage/indexing_ops.py @@ -132,7 +132,7 @@ def _parse_generated_media_tags(self, raw: str) -> List[str]: if not text: return [] - fenced_match = re.match(r"^```(?:[A-Za-z0-9_+-]+)?\s*\n?(.*?)\n?```$", text, flags=re.DOTALL) + fenced_match = re.search(r"```(?:[A-Za-z0-9_+-]+)?\s*(.*?)\s*```", text, flags=re.DOTALL) if fenced_match: text = fenced_match.group(1).strip() diff --git a/tests/test_config_tools.py b/tests/test_config_tools.py index 15cea0c..79d4b58 100644 --- a/tests/test_config_tools.py +++ b/tests/test_config_tools.py @@ -510,6 +510,8 @@ async def test_explain_results_surfaces_memory_rollup_provenance(self): memory_role="root", memory_root_path="notes/demo.md", memory_hit_count=3, + memory_primary_evidence_path="notes/demo.md::section:0001", + memory_supporting_paths=["notes/demo.md::image:0001"], audit=SearchAudit( filepath="notes/demo.md", content_type="text", @@ -521,6 +523,8 @@ async def test_explain_results_surfaces_memory_rollup_provenance(self): blend_weights={"rrf": 0.8, "rerank": 0.2}, media_compensation_applied=False, memory_rollup_boost=1.06, + memory_primary_evidence_path="notes/demo.md::section:0001", + memory_supporting_paths=["notes/demo.md::image:0001"], final_blended_score=0.8123, ), ) @@ -536,8 +540,18 @@ async def test_explain_results_surfaces_memory_rollup_provenance(self): self.assertEqual(explained["memory_id"], "mem-123") self.assertEqual(explained["memory_hit_count"], 3) self.assertEqual(explained["memory_root_path"], "notes/demo.md") + self.assertEqual(explained["memory_primary_evidence_path"], "notes/demo.md::section:0001") + self.assertEqual(explained["memory_supporting_paths"], ["notes/demo.md::image:0001"]) self.assertEqual(explained["provenance"]["memory_rollup"]["memory_hit_count"], 3) self.assertAlmostEqual(explained["provenance"]["memory_rollup"]["boost"], 1.06) + self.assertEqual( + explained["provenance"]["memory_rollup"]["primary_evidence_path"], + "notes/demo.md::section:0001", + ) + self.assertEqual( + explained["provenance"]["memory_rollup"]["supporting_paths"], + ["notes/demo.md::image:0001"], + ) async def test_search_file_path_routes_through_text_query(self): backend = _make_backend() diff --git a/tests/test_search_batch.py b/tests/test_search_batch.py index ea038b6..6a4342a 100644 --- a/tests/test_search_batch.py +++ b/tests/test_search_batch.py @@ -6,6 +6,7 @@ import os import sys +import time import unittest from dataclasses import dataclass from typing import List, Dict, Any, Optional @@ -567,12 +568,11 @@ def test_same_document_merges_tags_deterministically(self): ], ] - call_idx = [0] - def mock_search(self, query): - idx = call_idx[0] - call_idx[0] += 1 - return results_list[idx] + if query == "query one": + time.sleep(0.05) + return results_list[0] + return results_list[1] with patch.object(HybridSearcher, '__init__', lambda self, **kwargs: None): with patch.object(HybridSearcher, 'search', mock_search): diff --git a/tests/test_search_pipeline.py b/tests/test_search_pipeline.py index fcde078..f0105a3 100644 --- a/tests/test_search_pipeline.py +++ b/tests/test_search_pipeline.py @@ -457,6 +457,54 @@ def test_memory_rollup_merges_tags_from_sibling_assets(self): self.assertEqual(len(rolled), 1) self.assertEqual(rolled[0].tags, ["diagram", "meeting notes"]) + def test_memory_rollup_preserves_collection_qualified_filepath(self): + searcher = HybridSearcher(backend=StubBackend(), storage=StubStorage(), limit=12) + + root = _make_search_result("recallforge://alpha/memories/demo.mp4", 0.2, "vec", "video") + child = _make_search_result( + "recallforge://alpha/memories/demo.mp4::transcript:0001", + 0.9, + "fts", + "text", + ) + sibling = _make_search_result( + "recallforge://alpha/memories/demo.mp4::frame:0001", + 0.6, + "vec", + "image", + ) + for item in (root, child, sibling): + item.collection = "alpha" + item.memory_id = "memory-evidence" + item.memory_root_path = "memories/demo.mp4" + root.memory_role = "root" + child.memory_role = "child" + sibling.memory_role = "child" + root.body = "Canonical demo video summary." + + blended = searcher._blend_scores( + [child, sibling, root], + { + child.filepath: 0.9, + sibling.filepath: 0.6, + root.filepath: 0.2, + }, + ) + + self.assertEqual(len(blended), 1) + result = blended[0] + self.assertEqual(result.filepath, "recallforge://alpha/memories/demo.mp4") + self.assertEqual(result.display_path, "alpha/memories/demo.mp4") + self.assertEqual(result.memory_root_path, "memories/demo.mp4") + self.assertEqual( + result.memory_primary_evidence_path, + "recallforge://alpha/memories/demo.mp4::transcript:0001", + ) + self.assertEqual( + result.memory_supporting_paths, + ["recallforge://alpha/memories/demo.mp4::frame:0001"], + ) + class TestParallelSearchTaskCapture(unittest.TestCase): def test_parallel_search_captures_original_vector(self): diff --git a/tests/test_storage.py b/tests/test_storage.py index be35123..72ef376 100644 --- a/tests/test_storage.py +++ b/tests/test_storage.py @@ -1362,6 +1362,33 @@ def fenced_json(_prompt: str, max_tokens: int = 60) -> str: ["diagram", "hidden layers", "neural network"], ) + def test_generated_media_tags_extract_fenced_json_from_wrapped_text(self): + embedder = CaptioningEmbedder() + + def wrapped_fenced_json(_prompt: str, max_tokens: int = 60) -> str: + return ( + "Here are the tags:\n" + "```json\n" + '["diagram", "hidden layers", "neural network"]\n' + "```" + ) + + embedder.generate_text = wrapped_fenced_json + + self.backend.index_image( + path=self.image_path, + collection="test", + embed_func=embedder, + caption_media=True, + ) + + rows = self.backend._embeddings_table.search().where("content_type = 'image'").to_list() + self.assertEqual(len(rows), 1) + self.assertEqual( + json.loads(rows[0].get("tags") or "[]"), + ["diagram", "hidden layers", "neural network"], + ) + def test_index_video_keeps_parent_memory_and_links_children(self): embedder = CaptioningEmbedder() logical_path = str(Path(self.video_path).expanduser().resolve()) diff --git a/tests/test_watch_folder.py b/tests/test_watch_folder.py index 56fd23a..a389ddb 100644 --- a/tests/test_watch_folder.py +++ b/tests/test_watch_folder.py @@ -39,6 +39,15 @@ def embed_image(self, image): return [0.0] * 8 +def _wait_until(predicate, timeout: float = 3.0, interval: float = 0.05) -> bool: + deadline = time.time() + timeout + while time.time() < deadline: + if predicate(): + return True + time.sleep(interval) + return predicate() + + def test_watch_folder_create_and_modify(tmp_path): storage = FakeStorage() backend = FakeBackend() @@ -58,15 +67,16 @@ def test_watch_folder_create_and_modify(tmp_path): f = watched / "note.md" f.write_text("v1", encoding="utf-8") - time.sleep(0.35) f.write_text("v2", encoding="utf-8") - time.sleep(0.35) + assert _wait_until( + lambda: bool(storage.upserts) and storage.upserts[-1]["text"] == "v2" + ) daemon.stop_watch(watch_id) - assert len(storage.upserts) >= 2 - assert storage.upserts[0]["path"] == "note.md" + assert storage.upserts + assert storage.upserts[-1]["path"] == "note.md" assert storage.upserts[-1]["text"] == "v2" @@ -89,10 +99,9 @@ def test_watch_folder_delete(tmp_path): ) watch_id = daemon.start_watch(config) - time.sleep(0.2) f.unlink() - time.sleep(0.35) + assert _wait_until(lambda: ("old.md", "default", False) in storage.deletes) daemon.stop_watch(watch_id) @@ -145,9 +154,9 @@ def test_watch_folder_image_uses_logical_path(tmp_path): image_path = watched / "diagram.png" image_path.write_bytes(b"fake image bytes") - time.sleep(1.0) + assert _wait_until(lambda: bool(storage.image_indexes)) image_path.unlink() - time.sleep(1.0) + assert _wait_until(lambda: ("diagram.png", "default", False) in storage.deletes) daemon.stop_watch(watch_id) @@ -175,9 +184,9 @@ def test_watch_folder_document_uses_logical_path_and_child_cleanup(tmp_path): document_path = watched / "notes.docx" document_path.write_bytes(b"placeholder") - time.sleep(1.0) + assert _wait_until(lambda: bool(storage.document_indexes)) document_path.unlink() - time.sleep(1.0) + assert _wait_until(lambda: ("notes.docx", "default", True) in storage.deletes) daemon.stop_watch(watch_id)