-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
127 lines (105 loc) · 4.41 KB
/
main.py
File metadata and controls
127 lines (105 loc) · 4.41 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
"""
GraphRAG pipeline entry point.
Typical flow::
chunk → split book into LangChain Documents + save JSON
extract → LLM structured output per chunk + save JSON
load → Neo4j upsert (passages, entities, events, edges)
query → retrieve from graph + answer (see ``graphrag_chain``)
``--steps`` runs a subset and **reuses in-memory state** when steps are chained in one
process (e.g. chunk then extract passes ``docs`` without re-reading the book).
"""
from __future__ import annotations
import argparse
from pathlib import Path
from src import config
from src.ingest_chunks import DEFAULT_BOOK_PATH, ingest_book, save_chunks_json
from src.extract_graph import (
DEFAULT_EXTRACTIONS_JSON,
extract_all_chunks,
load_extractions,
save_extractions,
)
from src.load_to_neo4j import load_extraction_batch, load_passages_from_docs
from src.graphrag_chain import answer_question
def run_chunk(book_path: Path | None = None, out_json: Path | None = None) -> list:
"""Step 1: Chunk the book; save JSON; return list of Documents."""
path = book_path or DEFAULT_BOOK_PATH
docs = ingest_book(book_path=path, chunk_size=1000, chunk_overlap=150)
out_path = save_chunks_json(docs, out_path=out_json)
print(f"[chunk] {len(docs)} passages saved to {out_path}")
return docs
def run_extract(docs: list | None = None, limit: int | None = None, out_json: Path | None = None) -> list[dict]:
"""Step 2: Extract triples from chunks via OpenAI; save JSON; return extractions."""
if docs is None:
docs = ingest_book(book_path=DEFAULT_BOOK_PATH)
if limit is not None:
docs = docs[:limit]
extractions = extract_all_chunks(docs, limit=limit)
save_extractions(extractions, out_path=out_json or DEFAULT_EXTRACTIONS_JSON)
print(f"[extract] {len(extractions)} extractions saved to {out_json or DEFAULT_EXTRACTIONS_JSON}")
return extractions
def run_load(docs: list | None = None, extractions: list[dict] | None = None) -> None:
"""Step 3: Load passages and extraction batch into Aura."""
if not config.AURA_URI or not config.AURA_USER or not config.AURA_PASSWORD:
print("[load] Skip: AURA_URI/AURA_USER/AURA_PASSWORD not set")
return
if docs is None:
docs = ingest_book(book_path=DEFAULT_BOOK_PATH)
if extractions is None:
extractions = load_extractions()
if not extractions:
print("[load] No extractions; loading passages only.")
n = load_passages_from_docs(docs)
print(f"[load] Loaded {n} Passage nodes into Aura.")
else:
load_extraction_batch(extractions, passage_docs=docs)
print("[load] Aura load done.")
def run_query(question: str | None = None) -> str:
"""Step 4: Run one GraphRAG query and return answer."""
q = question or "Who was at the tea party with Alice, and where did it take place?"
ans = answer_question(q)
print(f"[query] Q: {q}")
print(f"[query] A: {ans}")
return ans
def main() -> None:
parser = argparse.ArgumentParser(description="GraphRAG pipeline: chunk → extract → load → query")
parser.add_argument(
"--steps",
type=str,
default="chunk,extract,load,query",
help="Comma-separated steps: chunk, extract, load, query (default: all)",
)
parser.add_argument(
"--extract-limit",
type=int,
default=None,
help="Limit extraction to first N chunks (for testing)",
)
parser.add_argument(
"--query",
type=str,
default=None,
help="Optional question to run in query step",
)
args = parser.parse_args()
steps = [s.strip().lower() for s in args.steps.split(",") if s.strip()]
# Carried between steps in this run; load/extract can also reload from disk when None.
docs = None
extractions = None
if "chunk" in steps:
docs = run_chunk()
if "extract" in steps:
if docs is None:
docs = ingest_book(book_path=DEFAULT_BOOK_PATH)
extractions = run_extract(docs=docs, limit=args.extract_limit)
if "load" in steps:
if docs is None:
docs = ingest_book(book_path=DEFAULT_BOOK_PATH)
if extractions is None:
# e.g. ``--steps load`` after a previous extract run
extractions = load_extractions()
run_load(docs=docs, extractions=extractions)
if "query" in steps:
run_query(question=args.query)
if __name__ == "__main__":
main()