Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 65 additions & 4 deletions src/recallforge/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
"""

import concurrent.futures
import copy
import json
import logging
import os
import re
import time
from dataclasses import dataclass, field
from dataclasses import dataclass, field, replace
from hashlib import sha256
from typing import List, Dict, Any, Optional, Union

Expand Down Expand Up @@ -99,6 +100,20 @@ def _log_stage_metrics(
logger.debug("stage_metrics " + " ".join(log_parts))


def _canonicalize_memory_result_paths(result: "HybridResult", canonical_root_path: Optional[str]) -> tuple[str, str]:
"""Return a stable result filepath plus display path for rolled memory hits."""
if not canonical_root_path:
return result.filepath, result.display_path

if str(result.filepath or "").startswith("recallforge://"):
return (
f"recallforge://{result.collection}/{canonical_root_path}",
f"{result.collection}/{canonical_root_path}",
)

return canonical_root_path, canonical_root_path


# Intent-to-weight mappings for RRF fusion
# Each intent maps source names to weight multipliers
INTENT_WEIGHTS: Dict[str, Dict[str, float]] = {
Expand Down Expand Up @@ -132,6 +147,8 @@ class SearchAudit:
blend_weights: Dict[str, float] = field(default_factory=dict) # rrf_weight, rerank_weight
media_compensation_applied: bool = False # Whether media boost was applied in RRF
memory_rollup_boost: float = 1.0 # Multiplier applied when sibling assets are rolled up
memory_primary_evidence_path: Optional[str] = None
memory_supporting_paths: List[str] = field(default_factory=list)
final_blended_score: float = 0.0


Expand All @@ -157,6 +174,8 @@ class HybridResult:
memory_role: str = "root"
memory_root_path: Optional[str] = None
memory_hit_count: int = 1
memory_primary_evidence_path: Optional[str] = None
memory_supporting_paths: Optional[List[str]] = None
tags: Optional[List[str]] = None
audit: Optional[SearchAudit] = None # Per-result audit trail

Expand Down Expand Up @@ -1239,15 +1258,57 @@ def _merge_tags(items: List[HybridResult]) -> Optional[List[str]]:
rolled: List[HybridResult] = []
for key in order:
group = sorted(grouped[key], key=lambda item: item.score, reverse=True)
representative = group[0]
top_hit = group[0]
root_candidate = next(
(item for item in group if item.memory_role == "root"),
None,
)
representative = replace(root_candidate or top_hit)
representative.score = top_hit.score
representative.rrf_rank = top_hit.rrf_rank
representative.rerank_score = top_hit.rerank_score
representative.source = top_hit.source
representative.audit = copy.deepcopy(top_hit.audit) if top_hit.audit else None
representative.context = representative.context or top_hit.context
representative.body = representative.body or top_hit.body
representative.hash = representative.hash or top_hit.hash
representative.docid = representative.docid or top_hit.docid
representative.modified_at = representative.modified_at or top_hit.modified_at
representative.body_length = representative.body_length or top_hit.body_length

canonical_path = representative.memory_root_path or top_hit.memory_root_path
if canonical_path:
representative.filepath, representative.display_path = _canonicalize_memory_result_paths(
representative,
canonical_path,
)
if not root_candidate:
representative.title = os.path.basename(canonical_path)
representative.memory_root_path = canonical_path
else:
representative.memory_root_path = representative.filepath

representative.memory_role = "root"
representative.memory_hit_count = len(group)
representative.tags = _merge_tags(group)
representative.memory_primary_evidence_path = top_hit.filepath
representative.memory_supporting_paths = [
item.filepath
for item in group
if item.filepath not in {representative.filepath, top_hit.filepath}
][:5]
memory_rollup_boost = 1.0
if len(group) > 1:
memory_rollup_boost += min(0.15, 0.03 * (len(group) - 1))
representative.score *= memory_rollup_boost
if representative.audit:
representative.audit.filepath = representative.filepath
representative.audit.content_type = representative.content_type
representative.audit.memory_rollup_boost = memory_rollup_boost
representative.audit.memory_primary_evidence_path = top_hit.filepath
representative.audit.memory_supporting_paths = list(
representative.memory_supporting_paths or []
)
representative.audit.final_blended_score = representative.score
rolled.append(representative)

Expand Down Expand Up @@ -1570,7 +1631,7 @@ def run_single_query(q: BatchQuery) -> List[tuple]:
return [(r, r.score) for r in results]

# Run all queries in parallel
all_results: Dict[int, List[tuple]] = {}
all_results: List[List[tuple]] = [[] for _ in batch_queries]
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_idx = {
executor.submit(run_single_query, q): i
Expand All @@ -1587,7 +1648,7 @@ def run_single_query(q: BatchQuery) -> List[tuple]:
# Merge results using RRF with best-score-wins
merged: Dict[str, Dict[str, Any]] = {}

for idx, results in all_results.items():
for idx, results in enumerate(all_results):
weight = batch_queries[idx].weight
for rank, (result, score) in enumerate(results):
filepath = result.filepath
Expand Down
4 changes: 4 additions & 0 deletions src/recallforge/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -1007,6 +1007,8 @@ async def _handle_explain_results(arguments: dict, backend, storage) -> list[Tex
"memory_role": getattr(r, "memory_role", "root"),
"memory_root_path": getattr(r, "memory_root_path", None),
"memory_hit_count": getattr(r, "memory_hit_count", 1),
"memory_primary_evidence_path": getattr(r, "memory_primary_evidence_path", None),
"memory_supporting_paths": getattr(r, "memory_supporting_paths", None),
"tags": getattr(r, "tags", None),
}

Expand All @@ -1029,6 +1031,8 @@ async def _handle_explain_results(arguments: dict, backend, storage) -> list[Tex
"memory_rollup": {
"memory_hit_count": getattr(r, "memory_hit_count", 1),
"boost": round(r.audit.memory_rollup_boost, 6),
"primary_evidence_path": r.audit.memory_primary_evidence_path,
"supporting_paths": list(r.audit.memory_supporting_paths),
},
}
else:
Expand Down
2 changes: 1 addition & 1 deletion src/recallforge/storage/indexing_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def _parse_generated_media_tags(self, raw: str) -> List[str]:
if not text:
return []

fenced_match = re.match(r"^```(?:[A-Za-z0-9_+-]+)?\s*\n?(.*?)\n?```$", text, flags=re.DOTALL)
fenced_match = re.search(r"```(?:[A-Za-z0-9_+-]+)?\s*(.*?)\s*```", text, flags=re.DOTALL)
if fenced_match:
text = fenced_match.group(1).strip()

Expand Down
14 changes: 14 additions & 0 deletions tests/test_config_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,8 @@ async def test_explain_results_surfaces_memory_rollup_provenance(self):
memory_role="root",
memory_root_path="notes/demo.md",
memory_hit_count=3,
memory_primary_evidence_path="notes/demo.md::section:0001",
memory_supporting_paths=["notes/demo.md::image:0001"],
audit=SearchAudit(
filepath="notes/demo.md",
content_type="text",
Expand All @@ -521,6 +523,8 @@ async def test_explain_results_surfaces_memory_rollup_provenance(self):
blend_weights={"rrf": 0.8, "rerank": 0.2},
media_compensation_applied=False,
memory_rollup_boost=1.06,
memory_primary_evidence_path="notes/demo.md::section:0001",
memory_supporting_paths=["notes/demo.md::image:0001"],
final_blended_score=0.8123,
),
)
Expand All @@ -536,8 +540,18 @@ async def test_explain_results_surfaces_memory_rollup_provenance(self):
self.assertEqual(explained["memory_id"], "mem-123")
self.assertEqual(explained["memory_hit_count"], 3)
self.assertEqual(explained["memory_root_path"], "notes/demo.md")
self.assertEqual(explained["memory_primary_evidence_path"], "notes/demo.md::section:0001")
self.assertEqual(explained["memory_supporting_paths"], ["notes/demo.md::image:0001"])
self.assertEqual(explained["provenance"]["memory_rollup"]["memory_hit_count"], 3)
self.assertAlmostEqual(explained["provenance"]["memory_rollup"]["boost"], 1.06)
self.assertEqual(
explained["provenance"]["memory_rollup"]["primary_evidence_path"],
"notes/demo.md::section:0001",
)
self.assertEqual(
explained["provenance"]["memory_rollup"]["supporting_paths"],
["notes/demo.md::image:0001"],
)

async def test_search_file_path_routes_through_text_query(self):
backend = _make_backend()
Expand Down
10 changes: 5 additions & 5 deletions tests/test_search_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import os
import sys
import time
import unittest
from dataclasses import dataclass
from typing import List, Dict, Any, Optional
Expand Down Expand Up @@ -567,12 +568,11 @@ def test_same_document_merges_tags_deterministically(self):
],
]

call_idx = [0]

def mock_search(self, query):
idx = call_idx[0]
call_idx[0] += 1
return results_list[idx]
if query == "query one":
time.sleep(0.05)
return results_list[0]
return results_list[1]

with patch.object(HybridSearcher, '__init__', lambda self, **kwargs: None):
with patch.object(HybridSearcher, 'search', mock_search):
Expand Down
48 changes: 48 additions & 0 deletions tests/test_search_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,54 @@ def test_memory_rollup_merges_tags_from_sibling_assets(self):
self.assertEqual(len(rolled), 1)
self.assertEqual(rolled[0].tags, ["diagram", "meeting notes"])

def test_memory_rollup_preserves_collection_qualified_filepath(self):
searcher = HybridSearcher(backend=StubBackend(), storage=StubStorage(), limit=12)

root = _make_search_result("recallforge://alpha/memories/demo.mp4", 0.2, "vec", "video")
child = _make_search_result(
"recallforge://alpha/memories/demo.mp4::transcript:0001",
0.9,
"fts",
"text",
)
sibling = _make_search_result(
"recallforge://alpha/memories/demo.mp4::frame:0001",
0.6,
"vec",
"image",
)
for item in (root, child, sibling):
item.collection = "alpha"
item.memory_id = "memory-evidence"
item.memory_root_path = "memories/demo.mp4"
root.memory_role = "root"
child.memory_role = "child"
sibling.memory_role = "child"
root.body = "Canonical demo video summary."

blended = searcher._blend_scores(
[child, sibling, root],
{
child.filepath: 0.9,
sibling.filepath: 0.6,
root.filepath: 0.2,
},
)

self.assertEqual(len(blended), 1)
result = blended[0]
self.assertEqual(result.filepath, "recallforge://alpha/memories/demo.mp4")
self.assertEqual(result.display_path, "alpha/memories/demo.mp4")
self.assertEqual(result.memory_root_path, "memories/demo.mp4")
self.assertEqual(
result.memory_primary_evidence_path,
"recallforge://alpha/memories/demo.mp4::transcript:0001",
)
self.assertEqual(
result.memory_supporting_paths,
["recallforge://alpha/memories/demo.mp4::frame:0001"],
)


class TestParallelSearchTaskCapture(unittest.TestCase):
def test_parallel_search_captures_original_vector(self):
Expand Down
27 changes: 27 additions & 0 deletions tests/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -1362,6 +1362,33 @@ def fenced_json(_prompt: str, max_tokens: int = 60) -> str:
["diagram", "hidden layers", "neural network"],
)

def test_generated_media_tags_extract_fenced_json_from_wrapped_text(self):
embedder = CaptioningEmbedder()

def wrapped_fenced_json(_prompt: str, max_tokens: int = 60) -> str:
return (
"Here are the tags:\n"
"```json\n"
'["diagram", "hidden layers", "neural network"]\n'
"```"
)

embedder.generate_text = wrapped_fenced_json

self.backend.index_image(
path=self.image_path,
collection="test",
embed_func=embedder,
caption_media=True,
)

rows = self.backend._embeddings_table.search().where("content_type = 'image'").to_list()
self.assertEqual(len(rows), 1)
self.assertEqual(
json.loads(rows[0].get("tags") or "[]"),
["diagram", "hidden layers", "neural network"],
)

def test_index_video_keeps_parent_memory_and_links_children(self):
embedder = CaptioningEmbedder()
logical_path = str(Path(self.video_path).expanduser().resolve())
Expand Down
29 changes: 19 additions & 10 deletions tests/test_watch_folder.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ def embed_image(self, image):
return [0.0] * 8


def _wait_until(predicate, timeout: float = 3.0, interval: float = 0.05) -> bool:
deadline = time.time() + timeout
while time.time() < deadline:
if predicate():
return True
time.sleep(interval)
return predicate()


def test_watch_folder_create_and_modify(tmp_path):
storage = FakeStorage()
backend = FakeBackend()
Expand All @@ -58,15 +67,16 @@ def test_watch_folder_create_and_modify(tmp_path):

f = watched / "note.md"
f.write_text("v1", encoding="utf-8")
time.sleep(0.35)

f.write_text("v2", encoding="utf-8")
time.sleep(0.35)
assert _wait_until(
lambda: bool(storage.upserts) and storage.upserts[-1]["text"] == "v2"
)

daemon.stop_watch(watch_id)

assert len(storage.upserts) >= 2
assert storage.upserts[0]["path"] == "note.md"
assert storage.upserts
assert storage.upserts[-1]["path"] == "note.md"
assert storage.upserts[-1]["text"] == "v2"


Expand All @@ -89,10 +99,9 @@ def test_watch_folder_delete(tmp_path):
)

watch_id = daemon.start_watch(config)
time.sleep(0.2)

f.unlink()
time.sleep(0.35)
assert _wait_until(lambda: ("old.md", "default", False) in storage.deletes)

daemon.stop_watch(watch_id)

Expand Down Expand Up @@ -145,9 +154,9 @@ def test_watch_folder_image_uses_logical_path(tmp_path):

image_path = watched / "diagram.png"
image_path.write_bytes(b"fake image bytes")
time.sleep(1.0)
assert _wait_until(lambda: bool(storage.image_indexes))
image_path.unlink()
time.sleep(1.0)
assert _wait_until(lambda: ("diagram.png", "default", False) in storage.deletes)

daemon.stop_watch(watch_id)

Expand Down Expand Up @@ -175,9 +184,9 @@ def test_watch_folder_document_uses_logical_path_and_child_cleanup(tmp_path):

document_path = watched / "notes.docx"
document_path.write_bytes(b"placeholder")
time.sleep(1.0)
assert _wait_until(lambda: bool(storage.document_indexes))
document_path.unlink()
time.sleep(1.0)
assert _wait_until(lambda: ("notes.docx", "default", True) in storage.deletes)

daemon.stop_watch(watch_id)

Expand Down
Loading