diff --git a/CLAUDE.md b/CLAUDE.md index e5f8458..33edc2f 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -41,14 +41,19 @@ All routes are mounted under `/api` via `app/api.py`. ``` POST /api/documents/upload + → compute SHA-256 content_hash of file bytes + → find_document_by_hash() — if match, return existing doc (dedup short-circuit) → save file to /tmp/cognee_uploads/ - → create_document() in Supabase (status=processing) + → create_document(content_hash=...) in Supabase (status=processing) → run_pipeline() in background: → upload_to_r2() (raw file to Cloudflare R2) → LLM-based client name + document type classification → cognee.add(file_path, dataset_name=client_name) → cognee.cognify(datasets=[client_name]) - → cognee.search(SearchType.CHUNKS) × 3 for summary/insights/entities + → summary: cognee.search(SearchType.SUMMARIES, datasets=[client_name]) + → insights + entities: read this dataset's subgraph directly via + get_dataset_related_nodes() / get_dataset_related_edges() + (keyed on Node.slug, NOT Node.id — edges reference the slug) → write results to Supabase (status=completed) GET /api/documents/search?q=...&dataset=...&search_type=... @@ -72,7 +77,7 @@ GET /api/health — Supabase connectivity check - `app/services/ingest.py` — `check_cognee_storage()` (startup writability check for `.cognee_system/`) - `app/services/cognee_service.py` — `search_knowledge_graph()` (used by `/documents/search` route) - `app/services/document_pipeline.py` — `run_pipeline()` (background ingest orchestration) -- `app/services/document_metadata_service.py` — Supabase CRUD for document records + `recover_stale_documents()` +- `app/services/document_metadata_service.py` — Supabase CRUD for document records, `find_document_by_hash()` for dedup, `recover_stale_documents()` - `app/services/graph_service.py` — `get_graph_data()` for D3 visualization - `app/services/storage.py` — `upload_to_r2()` and `get_presigned_url()` for Cloudflare R2 - `app/services/supabase_check.py` — `wait_for_supabase()` (startup health check) @@ -110,6 +115,15 @@ Point `.env` at the local Supabase: - `SUPABASE_URL=http://127.0.0.1:54321` - `SUPABASE_SERVICE_ROLE_KEY=` +## Resetting local state +```bash +# Wipes Cloudflare R2, Supabase docs, Cognee graph (venv + root), pgvector schema +cd backend && python scripts/clear_all.py --yes +``` +Note: Cognee resolves its `.cognee_system/` path relative to the installed +package, so the graph DB lives inside the venv. `clear_all.py` handles both +the venv path and the backend-root fallback. + ## Running tests ```bash cd backend && pytest @@ -188,3 +202,4 @@ CLOUDFLARE_R2_ENDPOINT, CLOUDFLARE_R2_ACCESS_KEY_ID, CLOUDFLARE_R2_SECRET_KEY, C - Search endpoint defaults to `SearchType.GRAPH_COMPLETION` - Allowed upload extensions: `.pdf`, `.csv`, `.txt` — max 5 files per request - Stale documents (stuck in `processing` >30 min) are auto-recovered to `failed` on startup +- Uploads compute SHA-256 `content_hash` and short-circuit via `find_document_by_hash()` before processing diff --git a/backend/app/services/document_pipeline.py b/backend/app/services/document_pipeline.py index 762ba44..513c4aa 100644 --- a/backend/app/services/document_pipeline.py +++ b/backend/app/services/document_pipeline.py @@ -82,48 +82,14 @@ async def _call_llm(prompt: str, max_retries: int = 6) -> str: return "" # pragma: no cover – loop always returns or raises -_BULLET_PREFIXES = ("- ", "* ", "• ", "– ", "— ") - - -def _split_bulleted(raw: list[str]) -> list[str]: - """Split bulleted/numbered LLM answers into discrete items. - - GRAPH_COMPLETION returns one narrative string per result; the UI renders - a list, so we split on newlines and strip leading bullet/number markers. - """ - items: list[str] = [] - for block in raw: - for line in block.splitlines(): - line = line.strip() - if not line: - continue - for prefix in _BULLET_PREFIXES: - if line.startswith(prefix): - line = line[len(prefix) :].strip() - break - else: - # Strip "1. ", "2) " style numeric prefixes - head, sep, rest = line.partition(" ") - if sep and head.rstrip(".)").isdigit(): - line = rest.strip() - if line: - items.append(line) - return items - - -def _extract_search_text(result) -> str: - """Pull a plain string out of a Cognee SearchResult, dict, or raw value.""" - if hasattr(result, "search_result"): - payload = result.search_result - elif isinstance(result, dict): - payload = result.get("search_result", result) - else: - payload = result - if isinstance(payload, list): - return " ".join(str(p) for p in payload) +def _extract_summary_text(result) -> str: + """SUMMARIES returns payload dicts with a 'text' field.""" + if isinstance(result, dict): + return result.get("text") or result.get("summary") or "" + payload = getattr(result, "payload", None) if isinstance(payload, dict): - return payload.get("text", "") or str(payload) - return str(payload) if payload is not None else "" + return payload.get("text") or payload.get("summary") or "" + return getattr(result, "text", "") or "" # --------------------------------------------------------------------------- @@ -205,7 +171,7 @@ def _now() -> str: await _update(progress_stage="building_graph") # ------------------------------------------------------------------ - # Step 4 – Cognify (build knowledge graph) + # Step 4 – Cognify (build knowledge graph for this dataset) # ------------------------------------------------------------------ await asyncio.wait_for( cognee.cognify(datasets=[client_name]), @@ -214,56 +180,120 @@ def _now() -> str: await _update(progress_stage="analyzing") # ------------------------------------------------------------------ - # Step 5 – Extract summary + # Step 5 – Summary via SearchType.SUMMARIES (pre-generated by cognify) # ------------------------------------------------------------------ summary_results = await asyncio.wait_for( cognee.search( - query_text="Provide a concise executive summary of this document.", - query_type=SearchType.GRAPH_SUMMARY_COMPLETION, + query_text=original_filename, + query_type=SearchType.SUMMARIES, datasets=[client_name], ), timeout=_COGNEE_TIMEOUT, ) - summary = _extract_search_text(summary_results[0]) if summary_results else "" + summary_texts = [ + t for t in (_extract_summary_text(r) for r in (summary_results or [])) if t + ] + summary = "\n\n".join(summary_texts[:5]) # ------------------------------------------------------------------ - # Step 6 – Extract insights (key relationships & takeaways) + # Step 6 – Insights + Entities scoped to this dataset. + # Cognee tags every node/edge with `dataset_id` in its relational + # store, so we can read this dataset's subgraph directly — no + # global snapshot diff, no concurrent-upload race. # ------------------------------------------------------------------ await _update(progress_stage="extracting_insights") - insights_results = await asyncio.wait_for( - cognee.search( - query_text=( - "What are the key insights, relationships, and notable " - "takeaways from this document? Return each as a separate " - "bullet point." - ), - query_type=SearchType.GRAPH_COMPLETION, - datasets=[client_name], - ), - timeout=_COGNEE_TIMEOUT, + + from cognee.modules.data.methods.get_authorized_dataset_by_name import ( + get_authorized_dataset_by_name, ) - insights: list[str] = _split_bulleted( - [_extract_search_text(r) for r in (insights_results or [])] + from cognee.modules.graph.methods import ( + get_dataset_related_edges, + get_dataset_related_nodes, ) + from cognee.modules.users.methods.get_default_user import get_default_user - # ------------------------------------------------------------------ - # Step 7 – Extract entities - # ------------------------------------------------------------------ - entity_results = await asyncio.wait_for( - cognee.search( - query_text=( - "List the key named entities in this document " - "(people, organizations, products, locations, identifiers). " - "Return one entity per line, no descriptions." - ), - query_type=SearchType.GRAPH_COMPLETION, - datasets=[client_name], - ), - timeout=_COGNEE_TIMEOUT, - ) - entities: list[str] = _split_bulleted( - [_extract_search_text(r) for r in (entity_results or [])] + insights: list[str] = [] + entities: list[str] = [] + + cognee_user = await get_default_user() + dataset = await get_authorized_dataset_by_name( + dataset_name=client_name, user=cognee_user, permission_type="read" ) + if dataset is None: + logger.warning( + "No Cognee dataset found for client_name=%r (user=%s); " + "insights/entities will be empty", + client_name, + getattr(cognee_user, "email", "?"), + ) + ds_nodes, ds_edges = [], [] + else: + ds_nodes = await get_dataset_related_nodes(dataset.id) + ds_edges = await get_dataset_related_edges(dataset.id) + logger.info( + "Dataset %r (id=%s): %d nodes, %d edges", + client_name, + dataset.id, + len(ds_nodes), + len(ds_edges), + ) + + if ds_nodes or ds_edges: + _STRUCTURAL_TYPES = { + "TextDocument", + "DocumentChunk", + "TextSummary", + "IndexSchema", + "Document", + } + + def _node_label(node) -> str: + if node.label: + return str(node.label) + attrs = node.attributes or {} + return str( + attrs.get("name") + or attrs.get("text") + or attrs.get("label") + or node.id + ) + + # Edges reference nodes by `slug` (the DataPoint's original id), + # not by Node.id (which is a derived uuid5). See cognee's + # upsert_nodes / upsert_edges. + entity_nodes_by_slug: dict[str, object] = {} + for n in ds_nodes: + if (n.type or "") in _STRUCTURAL_TYPES: + continue + entity_nodes_by_slug[str(n.slug)] = n + + seen_labels: set[str] = set() + for n in entity_nodes_by_slug.values(): + label = _node_label(n).strip() + if label and label not in seen_labels: + seen_labels.add(label) + entities.append(label) + + seen_triplets: set[str] = set() + for e in ds_edges: + sid = str(e.source_node_id) + tid = str(e.destination_node_id) + src = entity_nodes_by_slug.get(sid) + dst = entity_nodes_by_slug.get(tid) + if src is None or dst is None: + continue + source_label = _node_label(src).strip() + target_label = _node_label(dst).strip() + rel_label = ( + str(e.relationship_name or "related_to").replace("_", " ").strip() + ) + if not (source_label and target_label): + continue + triplet = f"{source_label} → {rel_label} → {target_label}" + if triplet in seen_triplets: + continue + seen_triplets.add(triplet) + insights.append(triplet) # ------------------------------------------------------------------ # Step 8 – Write final state to DB diff --git a/backend/requirements.txt b/backend/requirements.txt index b4b9b6e..e108c0f 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -1,5 +1,7 @@ # Web Framework & Server -fastapi==0.119.0 +# fastapi bumped from 0.119.0 to close starlette<0.49 CVEs (fastapi 0.119 +# pins starlette<0.49.0; fastapi 0.120+ supports the patched starlette). +fastapi>=0.120.0 uvicorn[standard]==0.37.0 # Supabase Client @@ -19,8 +21,8 @@ ruff==0.8.4 pytest>=8.0.0 pytest-asyncio>=0.23.0 -# LLM Integration -litellm>=1.52.0 +# LLM Integration +litellm>=1.83.0 openai>=1.0.0 # Data Analysis @@ -37,9 +39,19 @@ cognee[postgres,gemini]>=0.5.5 kuzu>=0.11.3 neo4j>=5.0.0 asyncpg>=0.29.0 -python-multipart>=0.0.9 +python-multipart>=0.0.26 boto3>=1.26.0 +# Transitive security constraints (pin minimum-safe versions of deps +# pulled in by fastapi/cognee/etc. to close CVEs). Keep upper bound on +# starlette to stay on fastapi 0.x's 0.x line. +aiohttp>=3.13.4 +cryptography>=46.0.7 +pygments>=2.20.0 +pypdf>=6.10.2 +requests>=2.33.0 +starlette>=0.49.1,<1.0 + # docling==2.55.1 # docling-core==2.48.4 # docling-ibm-models==3.9.1 diff --git a/backend/scripts/clear_all.py b/backend/scripts/clear_all.py new file mode 100644 index 0000000..4a12293 --- /dev/null +++ b/backend/scripts/clear_all.py @@ -0,0 +1,182 @@ +"""Clear all Cortex data: Cloudflare R2, Supabase, and the Cognee knowledge graph. + +Usage: + cd backend + python scripts/clear_all.py # prompts before each step + python scripts/clear_all.py --yes # no prompts + python scripts/clear_all.py --only r2 # r2 | supabase | cognee (repeatable) + +Reads credentials from the project-root `.env`. +""" + +from __future__ import annotations + +import argparse +import asyncio +import os +import shutil +import sys +from pathlib import Path + +from dotenv import load_dotenv + +REPO_ROOT = Path(__file__).resolve().parents[2] +BACKEND_ROOT = REPO_ROOT / "backend" +load_dotenv(REPO_ROOT / ".env") + + +def confirm(prompt: str, auto_yes: bool) -> bool: + if auto_yes: + return True + return input(f"{prompt} [y/N] ").strip().lower() in {"y", "yes"} + + +def clear_r2(auto_yes: bool) -> None: + import boto3 + from botocore.exceptions import ClientError + + endpoint = os.getenv("CLOUDFLARE_R2_ENDPOINT") + access_key = os.getenv("R2_ACCESS_KEY_ID") or os.getenv("CLOUDFLARE_R2_ACCESS_KEY_ID") + secret_key = os.getenv("R2_SECRET_KEY") or os.getenv("CLOUDFLARE_R2_SECRET_KEY") + bucket = os.getenv("CLOUDFLARE_R2_BUCKET_NAME") + + if not all([endpoint, access_key, secret_key, bucket]): + print("[r2] missing credentials — skipping") + return + + if not confirm(f"[r2] delete ALL objects in bucket '{bucket}'?", auto_yes): + print("[r2] skipped") + return + + s3 = boto3.client( + "s3", + endpoint_url=endpoint, + aws_access_key_id=access_key, + aws_secret_access_key=secret_key, + region_name="auto", + ) + + deleted = 0 + paginator = s3.get_paginator("list_objects_v2") + try: + for page in paginator.paginate(Bucket=bucket): + objs = page.get("Contents") or [] + if not objs: + continue + s3.delete_objects( + Bucket=bucket, + Delete={"Objects": [{"Key": o["Key"]} for o in objs]}, + ) + deleted += len(objs) + except ClientError as e: + print(f"[r2] error: {e}") + return + print(f"[r2] deleted {deleted} objects from '{bucket}'") + + +async def clear_supabase(auto_yes: bool) -> None: + url = os.getenv("SUPABASE_URL") + key = os.getenv("SUPABASE_SERVICE_ROLE_KEY") + if not url or not key: + print("[supabase] missing credentials — skipping") + return + + if not confirm("[supabase] truncate table 'cortex_documents'?", auto_yes): + print("[supabase] skipped") + return + + from supabase import acreate_client + + sb = await acreate_client(url, key) + res = await sb.table("cortex_documents").delete().neq("id", "00000000-0000-0000-0000-000000000000").execute() + print(f"[supabase] deleted {len(res.data or [])} rows from cortex_documents") + + +async def clear_cognee_local(auto_yes: bool) -> None: + # Cognee resolves system_root relative to the installed package, not CWD, + # so ask it directly where the graph/vector files live. + cognee_system_paths: list[Path] = [] + try: + from cognee.base_config import get_base_config + + cognee_system_paths.append(Path(get_base_config().system_root_directory)) + except Exception as e: + print(f"[cognee] could not resolve system_root via cognee ({e})") + + targets = [ + *cognee_system_paths, + BACKEND_ROOT / ".cognee_system", + BACKEND_ROOT / ".data_storage", + BACKEND_ROOT / "cortex_local.db", + BACKEND_ROOT / "cortex_local.db-shm", + BACKEND_ROOT / "cortex_local.db-wal", + ] + existing = [p for p in targets if p.exists()] + if not existing: + print("[cognee] no local storage to remove") + else: + if not confirm(f"[cognee] delete {len(existing)} local path(s): {[p.name for p in existing]}?", auto_yes): + print("[cognee] local delete skipped") + else: + for p in existing: + if p.is_dir(): + shutil.rmtree(p) + else: + p.unlink() + print(f"[cognee] removed {p}") + + vector_url = os.getenv("VECTOR_DB_URL") + if not vector_url: + print("[cognee] no VECTOR_DB_URL — skipping pgvector wipe") + return + + if not confirm("[cognee] drop all tables in pgvector database (public schema)?", auto_yes): + print("[cognee] pgvector skipped") + return + + try: + import asyncpg + except ImportError: + print("[cognee] asyncpg not installed — run `pip install asyncpg` to wipe pgvector") + return + + dsn = vector_url.replace("postgresql+asyncpg://", "postgresql://").replace("postgresql+psycopg://", "postgresql://") + + try: + conn = await asyncpg.connect(dsn) + except (OSError, asyncpg.PostgresError) as e: + print(f"[cognee] could not connect to pgvector ({e}) — skipping") + return + try: + await conn.execute("DROP SCHEMA public CASCADE; CREATE SCHEMA public;") + finally: + await conn.close() + print("[cognee] dropped and recreated public schema in pgvector DB") + + +async def main() -> int: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--yes", "-y", action="store_true", help="skip all confirmations") + parser.add_argument( + "--only", + action="append", + choices=["r2", "supabase", "cognee"], + help="run only the specified step(s); repeatable", + ) + args = parser.parse_args() + + steps = set(args.only) if args.only else {"r2", "supabase", "cognee"} + + if "r2" in steps: + clear_r2(args.yes) + if "supabase" in steps: + await clear_supabase(args.yes) + if "cognee" in steps: + await clear_cognee_local(args.yes) + + print("done.") + return 0 + + +if __name__ == "__main__": + sys.exit(asyncio.run(main())) diff --git a/frontend/package-lock.json b/frontend/package-lock.json index 7fc3632..ae638b0 100644 --- a/frontend/package-lock.json +++ b/frontend/package-lock.json @@ -1431,9 +1431,9 @@ } }, "node_modules/axios": { - "version": "1.14.0", - "resolved": "https://registry.npmjs.org/axios/-/axios-1.14.0.tgz", - "integrity": "sha512-3Y8yrqLSwjuzpXuZ0oIYZ/XGgLwUIBU3uLvbcpb0pidD9ctpShJd43KSlEEkVQg6DS0G9NKyzOvBfUtDKEyHvQ==", + "version": "1.15.0", + "resolved": "https://registry.npmjs.org/axios/-/axios-1.15.0.tgz", + "integrity": "sha512-wWyJDlAatxk30ZJer+GeCWS209sA42X+N5jU2jy6oHTp7ufw8uzUTVFBX9+wTfAlhiJXGS0Bq7X6efruWjuK9Q==", "license": "MIT", "dependencies": { "follow-redirects": "^1.15.11", @@ -2110,9 +2110,9 @@ } }, "node_modules/follow-redirects": { - "version": "1.15.11", - "resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.15.11.tgz", - "integrity": "sha512-deG2P0JfjrTxl50XGCDyfI97ZGVCxIpfKYmfyrQ54n5FO/0gfIES8C/Psl6kWVDolizcaaxZJnTS0QSMxvnsBQ==", + "version": "1.16.0", + "resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.16.0.tgz", + "integrity": "sha512-y5rN/uOsadFT/JfYwhxRS5R7Qce+g3zG97+JrtFZlC9klX/W5hD7iiLzScI4nZqUS7DNUdhPgw4xI8W2LuXlUw==", "funding": [ { "type": "individual", @@ -3382,9 +3382,9 @@ "license": "MIT" }, "node_modules/vite": { - "version": "6.4.1", - "resolved": "https://registry.npmjs.org/vite/-/vite-6.4.1.tgz", - "integrity": "sha512-+Oxm7q9hDoLMyJOYfUYBuHQo+dkAloi33apOPP56pzj+vsdJDzr+j1NISE5pyaAuKL4A3UD34qd0lx5+kfKp2g==", + "version": "6.4.2", + "resolved": "https://registry.npmjs.org/vite/-/vite-6.4.2.tgz", + "integrity": "sha512-2N/55r4JDJ4gdrCvGgINMy+HH3iRpNIz8K6SFwVsA+JbQScLiC+clmAxBgwiSPgcG9U15QmvqCGWzMbqda5zGQ==", "dev": true, "license": "MIT", "dependencies": {