Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,19 @@ All routes are mounted under `/api` via `app/api.py`.

```
POST /api/documents/upload
→ compute SHA-256 content_hash of file bytes
→ find_document_by_hash() — if match, return existing doc (dedup short-circuit)
→ save file to /tmp/cognee_uploads/
→ create_document() in Supabase (status=processing)
→ create_document(content_hash=...) in Supabase (status=processing)
→ run_pipeline() in background:
→ upload_to_r2() (raw file to Cloudflare R2)
→ LLM-based client name + document type classification
→ cognee.add(file_path, dataset_name=client_name)
→ cognee.cognify(datasets=[client_name])
→ cognee.search(SearchType.CHUNKS) × 3 for summary/insights/entities
→ summary: cognee.search(SearchType.SUMMARIES, datasets=[client_name])
→ insights + entities: read this dataset's subgraph directly via
get_dataset_related_nodes() / get_dataset_related_edges()
(keyed on Node.slug, NOT Node.id — edges reference the slug)
→ write results to Supabase (status=completed)

GET /api/documents/search?q=...&dataset=...&search_type=...
Expand All @@ -72,7 +77,7 @@ GET /api/health — Supabase connectivity check
- `app/services/ingest.py` — `check_cognee_storage()` (startup writability check for `.cognee_system/`)
- `app/services/cognee_service.py` — `search_knowledge_graph()` (used by `/documents/search` route)
- `app/services/document_pipeline.py` — `run_pipeline()` (background ingest orchestration)
- `app/services/document_metadata_service.py` — Supabase CRUD for document records + `recover_stale_documents()`
- `app/services/document_metadata_service.py` — Supabase CRUD for document records, `find_document_by_hash()` for dedup, `recover_stale_documents()`
- `app/services/graph_service.py` — `get_graph_data()` for D3 visualization
- `app/services/storage.py` — `upload_to_r2()` and `get_presigned_url()` for Cloudflare R2
- `app/services/supabase_check.py` — `wait_for_supabase()` (startup health check)
Expand Down Expand Up @@ -110,6 +115,15 @@ Point `.env` at the local Supabase:
- `SUPABASE_URL=http://127.0.0.1:54321`
- `SUPABASE_SERVICE_ROLE_KEY=<value from "supabase status -o env">`

## Resetting local state
```bash
# Wipes Cloudflare R2, Supabase docs, Cognee graph (venv + root), pgvector schema
cd backend && python scripts/clear_all.py --yes
```
Note: Cognee resolves its `.cognee_system/` path relative to the installed
package, so the graph DB lives inside the venv. `clear_all.py` handles both
the venv path and the backend-root fallback.

## Running tests
```bash
cd backend && pytest
Expand Down Expand Up @@ -188,3 +202,4 @@ CLOUDFLARE_R2_ENDPOINT, CLOUDFLARE_R2_ACCESS_KEY_ID, CLOUDFLARE_R2_SECRET_KEY, C
- Search endpoint defaults to `SearchType.GRAPH_COMPLETION`
- Allowed upload extensions: `.pdf`, `.csv`, `.txt` — max 5 files per request
- Stale documents (stuck in `processing` >30 min) are auto-recovered to `failed` on startup
- Uploads compute SHA-256 `content_hash` and short-circuit via `find_document_by_hash()` before processing
184 changes: 107 additions & 77 deletions backend/app/services/document_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,48 +82,14 @@ async def _call_llm(prompt: str, max_retries: int = 6) -> str:
return "" # pragma: no cover – loop always returns or raises


_BULLET_PREFIXES = ("- ", "* ", "• ", "– ", "— ")


def _split_bulleted(raw: list[str]) -> list[str]:
"""Split bulleted/numbered LLM answers into discrete items.

GRAPH_COMPLETION returns one narrative string per result; the UI renders
a list, so we split on newlines and strip leading bullet/number markers.
"""
items: list[str] = []
for block in raw:
for line in block.splitlines():
line = line.strip()
if not line:
continue
for prefix in _BULLET_PREFIXES:
if line.startswith(prefix):
line = line[len(prefix) :].strip()
break
else:
# Strip "1. ", "2) " style numeric prefixes
head, sep, rest = line.partition(" ")
if sep and head.rstrip(".)").isdigit():
line = rest.strip()
if line:
items.append(line)
return items


def _extract_search_text(result) -> str:
"""Pull a plain string out of a Cognee SearchResult, dict, or raw value."""
if hasattr(result, "search_result"):
payload = result.search_result
elif isinstance(result, dict):
payload = result.get("search_result", result)
else:
payload = result
if isinstance(payload, list):
return " ".join(str(p) for p in payload)
def _extract_summary_text(result) -> str:
"""SUMMARIES returns payload dicts with a 'text' field."""
if isinstance(result, dict):
return result.get("text") or result.get("summary") or ""
payload = getattr(result, "payload", None)
if isinstance(payload, dict):
return payload.get("text", "") or str(payload)
return str(payload) if payload is not None else ""
return payload.get("text") or payload.get("summary") or ""
return getattr(result, "text", "") or ""


# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -205,7 +171,7 @@ def _now() -> str:
await _update(progress_stage="building_graph")

# ------------------------------------------------------------------
# Step 4 – Cognify (build knowledge graph)
# Step 4 – Cognify (build knowledge graph for this dataset)
# ------------------------------------------------------------------
await asyncio.wait_for(
cognee.cognify(datasets=[client_name]),
Expand All @@ -214,56 +180,120 @@ def _now() -> str:
await _update(progress_stage="analyzing")

# ------------------------------------------------------------------
# Step 5 – Extract summary
# Step 5 – Summary via SearchType.SUMMARIES (pre-generated by cognify)
# ------------------------------------------------------------------
summary_results = await asyncio.wait_for(
cognee.search(
query_text="Provide a concise executive summary of this document.",
query_type=SearchType.GRAPH_SUMMARY_COMPLETION,
query_text=original_filename,
query_type=SearchType.SUMMARIES,
datasets=[client_name],
),
timeout=_COGNEE_TIMEOUT,
)
summary = _extract_search_text(summary_results[0]) if summary_results else ""
summary_texts = [
t for t in (_extract_summary_text(r) for r in (summary_results or [])) if t
]
summary = "\n\n".join(summary_texts[:5])

# ------------------------------------------------------------------
# Step 6 – Extract insights (key relationships & takeaways)
# Step 6 – Insights + Entities scoped to this dataset.
# Cognee tags every node/edge with `dataset_id` in its relational
# store, so we can read this dataset's subgraph directly — no
# global snapshot diff, no concurrent-upload race.
# ------------------------------------------------------------------
await _update(progress_stage="extracting_insights")
insights_results = await asyncio.wait_for(
cognee.search(
query_text=(
"What are the key insights, relationships, and notable "
"takeaways from this document? Return each as a separate "
"bullet point."
),
query_type=SearchType.GRAPH_COMPLETION,
datasets=[client_name],
),
timeout=_COGNEE_TIMEOUT,

from cognee.modules.data.methods.get_authorized_dataset_by_name import (
get_authorized_dataset_by_name,
)
insights: list[str] = _split_bulleted(
[_extract_search_text(r) for r in (insights_results or [])]
from cognee.modules.graph.methods import (
get_dataset_related_edges,
get_dataset_related_nodes,
)
from cognee.modules.users.methods.get_default_user import get_default_user

# ------------------------------------------------------------------
# Step 7 – Extract entities
# ------------------------------------------------------------------
entity_results = await asyncio.wait_for(
cognee.search(
query_text=(
"List the key named entities in this document "
"(people, organizations, products, locations, identifiers). "
"Return one entity per line, no descriptions."
),
query_type=SearchType.GRAPH_COMPLETION,
datasets=[client_name],
),
timeout=_COGNEE_TIMEOUT,
)
entities: list[str] = _split_bulleted(
[_extract_search_text(r) for r in (entity_results or [])]
insights: list[str] = []
entities: list[str] = []

cognee_user = await get_default_user()
dataset = await get_authorized_dataset_by_name(
dataset_name=client_name, user=cognee_user, permission_type="read"
)
if dataset is None:
logger.warning(
"No Cognee dataset found for client_name=%r (user=%s); "
"insights/entities will be empty",
client_name,
getattr(cognee_user, "email", "?"),
)
ds_nodes, ds_edges = [], []
else:
ds_nodes = await get_dataset_related_nodes(dataset.id)
ds_edges = await get_dataset_related_edges(dataset.id)
logger.info(
"Dataset %r (id=%s): %d nodes, %d edges",
client_name,
dataset.id,
len(ds_nodes),
len(ds_edges),
)

if ds_nodes or ds_edges:
_STRUCTURAL_TYPES = {
"TextDocument",
"DocumentChunk",
"TextSummary",
"IndexSchema",
"Document",
}

def _node_label(node) -> str:
if node.label:
return str(node.label)
attrs = node.attributes or {}
return str(
attrs.get("name")
or attrs.get("text")
or attrs.get("label")
or node.id
)

# Edges reference nodes by `slug` (the DataPoint's original id),
# not by Node.id (which is a derived uuid5). See cognee's
# upsert_nodes / upsert_edges.
entity_nodes_by_slug: dict[str, object] = {}
for n in ds_nodes:
if (n.type or "") in _STRUCTURAL_TYPES:
continue
entity_nodes_by_slug[str(n.slug)] = n

seen_labels: set[str] = set()
for n in entity_nodes_by_slug.values():
label = _node_label(n).strip()
if label and label not in seen_labels:
seen_labels.add(label)
entities.append(label)

seen_triplets: set[str] = set()
for e in ds_edges:
sid = str(e.source_node_id)
tid = str(e.destination_node_id)
src = entity_nodes_by_slug.get(sid)
dst = entity_nodes_by_slug.get(tid)
if src is None or dst is None:
continue
source_label = _node_label(src).strip()
target_label = _node_label(dst).strip()
rel_label = (
str(e.relationship_name or "related_to").replace("_", " ").strip()
)
if not (source_label and target_label):
continue
triplet = f"{source_label} → {rel_label} → {target_label}"
if triplet in seen_triplets:
continue
seen_triplets.add(triplet)
insights.append(triplet)

# ------------------------------------------------------------------
# Step 8 – Write final state to DB
Expand Down
20 changes: 16 additions & 4 deletions backend/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# Web Framework & Server
fastapi==0.119.0
# fastapi bumped from 0.119.0 to close starlette<0.49 CVEs (fastapi 0.119
# pins starlette<0.49.0; fastapi 0.120+ supports the patched starlette).
fastapi>=0.120.0
uvicorn[standard]==0.37.0

# Supabase Client
Expand All @@ -19,8 +21,8 @@ ruff==0.8.4
pytest>=8.0.0
pytest-asyncio>=0.23.0

# LLM Integration
litellm>=1.52.0
# LLM Integration
litellm>=1.83.0
openai>=1.0.0

# Data Analysis
Expand All @@ -37,9 +39,19 @@ cognee[postgres,gemini]>=0.5.5
kuzu>=0.11.3
neo4j>=5.0.0
asyncpg>=0.29.0
python-multipart>=0.0.9
python-multipart>=0.0.26
boto3>=1.26.0

# Transitive security constraints (pin minimum-safe versions of deps
# pulled in by fastapi/cognee/etc. to close CVEs). Keep upper bound on
# starlette to stay on fastapi 0.x's 0.x line.
aiohttp>=3.13.4
cryptography>=46.0.7
pygments>=2.20.0
pypdf>=6.10.2
requests>=2.33.0
starlette>=0.49.1,<1.0

# docling==2.55.1
# docling-core==2.48.4
# docling-ibm-models==3.9.1
Expand Down
Loading
Loading