-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver.py
More file actions
81 lines (60 loc) · 1.89 KB
/
server.py
File metadata and controls
81 lines (60 loc) · 1.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
from __future__ import annotations
import json
import threading
from pathlib import Path
from queue import Empty, Queue
from dotenv import load_dotenv
from fastapi import FastAPI
from fastapi.responses import FileResponse, StreamingResponse
from core.pipeline import build_graph
from core.rag import build_client, get_backend, load_index
load_dotenv(override=False)
_CLIENT = build_client()
_CHUNKS, _DOC_MATRIX = load_index(Path("data/index.json"))
_GRAPH = build_graph()
print(f"→ backend: {get_backend()} · {len(_CHUNKS)} chunks loaded")
app = FastAPI()
def _sse(event: str, data: dict) -> str:
return f"event: {event}\ndata: {json.dumps(data)}\n\n"
def stream_pipeline(query_text: str):
"""Run the graph in a thread, drain its event queue into SSE frames."""
q: Queue = Queue()
err: list[str] = []
def run():
try:
_GRAPH.invoke({
"query": query_text,
"chunks": _CHUNKS,
"doc_matrix": _DOC_MATRIX,
"client": _CLIENT,
"event_queue": q,
})
except Exception as exc: # noqa: BLE001
err.append(str(exc))
finally:
q.put(None)
threading.Thread(target=run, daemon=True).start()
while True:
try:
ev = q.get(timeout=180)
except Empty:
yield _sse("error", {"message": "timeout"})
return
if ev is None:
break
event_type, data = ev
yield _sse(event_type, data)
if err:
yield _sse("error", {"message": err[0]})
else:
yield _sse("complete", {})
@app.get("/")
def root():
return FileResponse("static/index.html")
@app.get("/query")
def query(q: str):
return StreamingResponse(
stream_pipeline(q),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)