forked from KnowledgeXLab/LeanRAG
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathleanrag_cli.py
More file actions
executable file
·609 lines (530 loc) · 25.6 KB
/
leanrag_cli.py
File metadata and controls
executable file
·609 lines (530 loc) · 25.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
#!/usr/bin/env python3
"""LeanRAG CLI - clean rebuilt version.
Commands:
chunk -> create chunk file(s)
extract -> run entity & relation extraction into SQLite (dqlite/leanrag.db)
build -> push entities/relations to Neo4j + build vector index (working_dir optional)
query -> query graph & semantic layer
pipeline -> convenience scaffold (manual extract/build still required)
check -> environment & service diagnostics
"""
import os
import sys
import json
import click
import logging
from pathlib import Path
from typing import Optional, Dict, Any, List
from dotenv import load_dotenv
from tools.utils import OpenAIChatClient
sys.path.append(os.getcwd())
load_dotenv()
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def validate_environment() -> bool:
required = ['OPENAI_API_KEY', 'GRAPH_URI', 'GRAPH_USER', 'GRAPH_PASSWORD']
missing = [v for v in required if not os.getenv(v)]
if missing:
click.echo(f"❌ Missing environment vars: {', '.join(missing)}")
return False
return True
@click.group()
@click.version_option(version="1.1.0")
def cli():
"""LeanRAG unified CLI."""
pass
@cli.command()
@click.argument('input_path', type=click.Path(exists=True))
@click.option('--output-dir', '-o', default='output', help='Output directory for results')
@click.option('--chunk-size', default=1024, help='Max tokens per chunk')
@click.option('--overlap', default=128, help='Token overlap between chunks')
@click.option('--strategy', type=click.Choice(['semantic', 'hybrid', 'fixed_token']), default='semantic')
@click.option('--dataset-name', default=None, help='Override dataset name')
@click.option('--sample-chunks', type=int, default=None, help='Randomly sample N chunks for a smaller test set')
@click.option('--sample-seed', type=int, default=None, help='Seed for sampling reproducibility')
def chunk(input_path: str, output_dir: str, chunk_size: int, overlap: int, strategy: str,
dataset_name: Optional[str], sample_chunks: Optional[int], sample_seed: Optional[int]):
"""Create chunk file(s) from documents (file or directory)."""
from file_chunk import chunk_documents, ChunkingStrategy, load_documents, load_documents_from_directory
import random
input_path_obj = Path(input_path)
if dataset_name is None:
dataset_name = input_path_obj.stem
out_dir = Path(output_dir) / dataset_name
out_dir.mkdir(parents=True, exist_ok=True)
chunk_file = out_dir / f"{dataset_name}_chunk.json"
# Load docs
if input_path_obj.is_dir():
documents = load_documents_from_directory(str(input_path_obj))
else:
documents = load_documents(str(input_path_obj))
if not documents:
click.echo("❌ No documents found")
return
click.echo(f"📄 Loaded {len(documents)} documents")
strategy_map = {
'semantic': ChunkingStrategy.SEMANTIC,
'hybrid': ChunkingStrategy.HYBRID,
'fixed_token': ChunkingStrategy.FIXED_TOKEN,
}
click.echo(f"🔄 Chunking (strategy={strategy}, size={chunk_size}, overlap={overlap})")
results = chunk_documents(documents, max_token_size=chunk_size, overlap_token_size=overlap,
strategy=strategy_map[strategy])
flattened: List[Dict[str, Any]] = []
for doc in results:
for ch in doc['chunks']:
flattened.append({
'hash_code': ch['hash_code'],
'text': ch['text'],
'tokens': ch['tokens'],
'quality_score': ch.get('quality_score', 0.0),
'strategy': ch['strategy']
})
original_total = len(flattened)
full_copy = None
if sample_chunks and 0 < sample_chunks < original_total:
full_copy = flattened
rng = random.Random(sample_seed)
flattened = rng.sample(flattened, sample_chunks)
click.echo(f"🎲 Sampled {len(flattened)} / {original_total} chunks (seed={sample_seed})")
elif sample_chunks and sample_chunks >= original_total:
click.echo("⚠️ sample-chunks >= total; using all chunks")
with open(chunk_file, 'w', encoding='utf-8') as f:
json.dump(flattened, f, ensure_ascii=False, indent=2)
if full_copy is not None:
with open(out_dir / f"{dataset_name}_chunk_full.json", 'w', encoding='utf-8') as f:
json.dump(full_copy, f, ensure_ascii=False, indent=2)
click.echo(f"✅ Wrote {len(flattened)} chunks → {chunk_file}")
@cli.command()
@click.argument('input_path', type=click.Path(exists=True))
@click.option('--output-dir', '-o', default='output', help='Output directory (workspace)')
def extract(input_path: str, output_dir: str):
"""Extract triples and entities from chunks (auto-detect *_chunk.json if directory given).
INPUT_PATH: Path to a chunk JSON file OR a directory containing one *_chunk.json file
Examples:
leanrag extract output/mix/mix_chunk.json
leanrag extract output/mix/ (auto-detects mix_chunk.json)
"""
if not validate_environment():
return
try:
path_obj = Path(input_path)
chosen_chunk: Optional[Path] = None
if path_obj.is_dir():
candidates = list(path_obj.glob('*_chunk.json')) or list(path_obj.rglob('*_chunk.json'))
if not candidates:
click.echo(f"❌ No *_chunk.json found in directory {input_path}")
return
candidates.sort(key=lambda p: p.stat().st_mtime, reverse=True)
chosen_chunk = candidates[0]
if len(candidates) > 1:
click.echo("⚠️ Multiple chunk files; using most recent: " + str(chosen_chunk))
else:
chosen_chunk = path_obj
if not chosen_chunk.name.endswith('_chunk.json'):
click.echo("⚠️ File does not end with _chunk.json – continuing")
if not chosen_chunk.exists():
click.echo("❌ Chunk file not found")
return
with open(chosen_chunk, 'r', encoding='utf-8') as f:
raw_chunks = json.load(f)
chunks_map = {c['hash_code']: c['text'] for c in raw_chunks}
click.echo(f"🔍 Extracting over {len(chunks_map)} chunks ...")
from GraphExtraction.chunk import triple_extraction, openai_generate_text
async def runner():
async def llm_func(prompt, history_messages=None):
if history_messages:
hist = '\n'.join(m['content'] for m in history_messages if m['role'] in ('user','assistant'))
prompt_full = hist + '\n' + prompt
else:
prompt_full = prompt
resp = await openai_generate_text(prompt_full)
return resp or ''
return await triple_extraction(chunks_map, llm_func)
import asyncio, sqlite3, json as _json
from database_utils import create_db_table_sqlite, get_db_path, get_embedding, store_text_units_records
create_db_table_sqlite(output_dir)
entities, relations = asyncio.run(runner())
# Merge entities by name
merged = {}
for e in entities:
name = e.get('entity_name')
if not name:
continue
entry = merged.setdefault(name, {'entity_name': name, 'description': '', 'source_id': '', 'degree': 0})
d = (e.get('description') or '').strip()
if d:
entry['description'] = (entry['description'] + ' | ' + d).strip(' |') if entry['description'] else d
sid = (e.get('source_id') or '').strip()
if sid and sid not in entry['source_id'].split('|'):
entry['source_id'] = (entry['source_id'] + '|' + sid).strip('|')
for v in merged.values():
if v['source_id']:
v['source_id'] = '|'.join(sorted(set(filter(None, v['source_id'].split('|')))))
# Normalize relations (dedupe by src,tgt,desc)
norm_rels = []
seen = set()
for r in relations:
if isinstance(r, list):
r = next((x for x in r if isinstance(x, dict)), None)
if not isinstance(r, dict):
continue
src = r.get('src_id') or r.get('source')
tgt = r.get('tgt_id') or r.get('target')
if not src or not tgt:
continue
desc = r.get('description') or ''
key = (src, tgt, desc)
if key in seen:
continue
seen.add(key)
try:
weight = int(r.get('weight', 1))
except Exception:
weight = 1
norm_rels.append({'src_tgt': src, 'tgt_src': tgt, 'description': desc, 'weight': weight})
db_path = get_db_path()
conn = sqlite3.connect(db_path)
cur = conn.cursor()
ent_count = 0
for ent in merged.values():
vec = []
if ent['description']:
vec = get_embedding(ent['description'])[0]
cur.execute(
"""INSERT OR REPLACE INTO entities (entity_name, description, source_id, degree, parent, level, vector, is_new)
VALUES (?, ?, ?, ?, ?, ?, ?, 1)""",
(ent['entity_name'], ent['description'], ent['source_id'], ent['degree'], None, 0, _json.dumps(vec) if vec else None)
)
ent_count += 1
rel_count = 0
for rel in norm_rels:
cur.execute(
"""INSERT INTO relations (src_tgt, tgt_src, description, weight, level, is_new)
VALUES (?, ?, ?, ?, ?, 1)""",
(rel['src_tgt'], rel['tgt_src'], rel['description'], rel['weight'], 0)
)
rel_count += 1
conn.commit()
conn.close()
inserted_units = store_text_units_records(raw_chunks) if raw_chunks else 0
click.echo("✅ Extraction complete")
click.echo(f" 🧬 Entities inserted: {ent_count}")
click.echo(f" 🔗 Relations inserted: {rel_count}")
click.echo(f" 📚 Text units stored: {inserted_units}")
click.echo(f" 💾 SQLite: {db_path}")
click.echo(" ➜ Next: leanrag build (working_dir optional)")
except Exception as e:
click.echo(f"❌ Extraction error: {e}", err=True)
raise click.Abort()
@cli.command()
@click.argument('working_dir', required=False, default='.', type=click.Path(exists=True))
@click.option('--refresh', is_flag=True, help='Clear Neo4j/Qdrant before loading hierarchy')
@click.option('--chunks-file', help='Optional chunk file to ingest text units (legacy)')
@click.option('--workers', '-w', type=int, help='Override worker count for hierarchy build')
def build(working_dir: str, refresh: bool, chunks_file: Optional[str], workers: Optional[int]):
"""Build graph & vector index from extracted (SQLite) data. WORKING_DIR optional."""
if not validate_environment():
return
try:
from database_utils import (
create_db_table_sqlite,
mark_entities_relations_processed,
store_text_units_in_db,
)
import build_graph as build_pipeline
from build_graph import hierarchical_clustering, embedding as pipeline_embedding
create_db_table_sqlite(working_dir)
chat_client = OpenAIChatClient()
build_pipeline.WORKING_DIR = working_dir
cpu_workers = os.cpu_count() or 4
max_workers = max(1, workers or cpu_workers)
json_format = {"type": "json_object"}
config = {
'max_workers': max_workers,
'working_dir': working_dir,
'use_llm_func': lambda prompt, **kwargs: chat_client.chat(prompt, **kwargs),
'embeddings_func': pipeline_embedding,
'aggregate_llm_kwargs': {'response_format': None},
'relation_llm_kwargs': {'response_format': None},
'clear_existing': refresh,
}
mode_label = "full refresh" if refresh else "incremental sync"
click.echo(f"📊 Building hierarchy with {max_workers} workers ({mode_label})")
graph_stats = hierarchical_clustering(config) or {}
mark_entities_relations_processed(working_dir)
if chunks_file:
try:
store_text_units_in_db(working_dir, chunks_file)
click.echo("📚 Text units stored from chunks file")
except Exception as te:
click.echo(f"⚠️ Text unit store failed: {te}")
cleared_nodes = graph_stats.get('cleared_nodes', 0)
entity_created = graph_stats.get('entities', 0)
relation_created = graph_stats.get('relationships', 0)
community_created = graph_stats.get('communities', 0)
sqlite_entities = graph_stats.get('sqlite_entities')
sqlite_relations = graph_stats.get('sqlite_relations')
sqlite_communities = graph_stats.get('sqlite_communities')
if refresh:
click.echo(f"🔄 Neo4j cleared ({cleared_nodes} nodes removed) and Qdrant collection recreated")
else:
click.echo("🔄 Hierarchy synced; Neo4j and Qdrant updated incrementally")
click.echo(f" 🧩 Entities created: {entity_created}")
click.echo(f" 🔗 Relationships created: {relation_created}")
click.echo(f" 🫂 Communities created: {community_created}")
if sqlite_entities is not None:
click.echo(f" 💾 SQLite entities stored: {sqlite_entities}")
if sqlite_relations is not None:
click.echo(f" 💾 SQLite relations stored: {sqlite_relations}")
if sqlite_communities is not None:
click.echo(f" 💾 SQLite communities stored: {sqlite_communities}")
summary_path = Path(working_dir) / "level_summary.txt"
if summary_path.exists():
click.echo("📐 Hierarchy level summary:")
click.echo(summary_path.read_text(encoding='utf-8').rstrip())
click.echo(f" ↳ Full summary: {summary_path}")
else:
click.echo("ℹ️ No hierarchical level summary found in working directory")
click.echo("✅ Build pipeline complete")
except Exception as e:
click.echo(f"❌ Build error: {e}")
raise click.Abort()
@cli.command()
@click.option('--level', type=int, help='Filter nodes by level value')
@click.option('--limit', type=int, default=200, help='Max nodes to fetch (default 200)')
@click.option('--relationship-type', '--rel-type', 'rel_type', help='Restrict to this relationship type')
@click.option('--layout', type=click.Choice(['spring','kamada','circular']), default='spring', help='Layout algorithm')
@click.option('--no-labels', is_flag=True, help='Hide node text labels')
@click.option('--output', help='Save figure to file instead of interactive window')
@click.option('--sample', is_flag=True, help='Random sample ordering (ORDER BY rand())')
@click.option('--node-size', type=int, default=500, help='Node size for drawing')
@click.option('--font-size', type=int, default=8, help='Font size for labels')
def visualize(level, limit, rel_type, layout, no_labels, output, sample, node_size, font_size):
"""Visualize a Neo4j subgraph (lightweight).
Pulls nodes (optionally filtered by --level) plus internal relationships and renders
with NetworkX + Matplotlib. Colors are assigned per level.
"""
if not validate_environment():
return
try:
from visualize_graph import fetch_subgraph, build_nx_graph, draw_graph
except ImportError as ie:
click.echo(f"❌ Missing visualization dependencies: {ie}. Run: pip install networkx matplotlib")
return
try:
nodes, rels = fetch_subgraph(level, limit, rel_type, sample)
if not nodes:
click.echo("ℹ️ No nodes returned for given filters")
return
click.echo(f"📥 Retrieved {len(nodes)} nodes / {len(rels)} relationships")
import networkx as nx # noqa: F401 (ensure dependency present)
G = build_nx_graph(nodes, rels)
draw_graph(G, layout, output, not no_labels, node_size, font_size)
if output:
click.echo(f"✅ Visualization saved to {output}")
else:
click.echo("✅ Visualization complete (window may still be open)")
except Exception as e:
click.echo(f"❌ Visualization error: {e}")
raise click.Abort()
@cli.command()
@click.argument('query')
@click.option('--top-k', default=10, help='Number of top entities to retrieve')
@click.option('--level-mode', default=1, help='Retrieval level mode')
@click.option('--chunks-file', help='Path to chunks file (auto-detected if not provided)')
def query(query: str, top_k: int, level_mode: int, chunks_file: Optional[str]):
"""Query the knowledge graph.
QUERY: The query string
Example: leanrag query "What is machine learning?" --top-k 5
"""
if not validate_environment():
return
try:
from query_graph import query_graph
click.echo(f"🔍 Querying: {query}")
# Create config from environment and parameters
from query_graph import embedding
config = {
'topk': top_k,
'level_mode': level_mode,
'chunks_file': chunks_file,
'embeddings_func': embedding
}
# Auto-detect chunks file if not provided
if not chunks_file:
# Try common locations for chunks file
potential_chunks = [
"test_chunk.json",
"datasets/cs/cs_chunk.json",
"datasets/legal/legal_chunk.json",
"datasets/agriculture/agriculture_chunk.json",
"datasets/mix/mix_chunk.json",
"output_science/science/science_chunk.json"
]
for chunk_path in potential_chunks:
if os.path.exists(chunk_path):
chunks_file = chunk_path
break
else:
# If user passed a directory, attempt auto-detect inside it
if os.path.isdir(chunks_file):
dir_path = Path(chunks_file)
candidates = list(dir_path.glob("*_chunk.json"))
if not candidates:
candidates = list(dir_path.rglob("*_chunk.json"))
if not candidates:
click.echo(f"❌ No *_chunk.json files found inside directory: {chunks_file}")
return
if len(candidates) > 1:
candidates.sort(key=lambda p: p.stat().st_mtime, reverse=True)
click.echo("⚠️ Multiple chunk files found; selecting most recent for query:")
for idx, c in enumerate(candidates, 1):
marker = "✅" if idx == 1 else " "
click.echo(f" {marker} {idx}. {c}")
chunks_file = str(candidates[0])
if not chunks_file:
click.echo("❌ Chunks file not found. Please specify with --chunks-file", err=True)
return
# Derive working_dir from chunks_file path
working_dir = os.path.dirname(chunks_file)
if not working_dir:
working_dir = "." # If chunks_file is in root directory
config['chunks_file'] = chunks_file
config['working_dir'] = working_dir
# Run query
click.echo("⏳ Processing query...")
ref, response = query_graph(config, None, query) # db parameter not used in new version
click.echo("\n" + "="*50)
click.echo("📋 REFERENCE INFORMATION:")
click.echo(ref)
click.echo("\n" + "="*50)
click.echo("🤖 GENERATED RESPONSE:")
click.echo(response)
click.echo("="*50)
except Exception as e:
click.echo(f"❌ Error during query: {str(e)}", err=True)
raise click.Abort()
@cli.command()
@click.argument('input_file', type=click.Path(exists=True))
@click.option('--output-dir', '-o', default='output', help='Output directory')
@click.option('--query', help='Query to run after building (optional)')
def pipeline(input_file: str, output_dir: str, query: Optional[str]):
"""Run the complete LeanRAG pipeline: chunk → extract → build → query.
INPUT_FILE: Path to input documents JSONL file
Example: leanrag pipeline datasets/mix/mix.jsonl --query "What is AI?"
"""
if not validate_environment():
return
try:
dataset_name = Path(input_file).stem
work_dir = Path(output_dir) / dataset_name
work_dir.mkdir(parents=True, exist_ok=True)
click.echo("🚀 Starting LeanRAG pipeline...")
click.echo(f" 📁 Working directory: {work_dir}")
click.echo(f" 📄 Input file: {input_file}")
# Step 1: Chunk documents
click.echo("\n1️⃣ Step 1: Chunking documents...")
chunk_file = work_dir / f"{dataset_name}_chunk.json"
# Import here to avoid circular imports
from file_chunk import chunk_documents, ChunkingStrategy
documents = []
with open(input_file, 'r', encoding='utf-8') as f:
for line in f:
if line.strip():
data = json.loads(line)
text = (data.get('context') or data.get('text') or
data.get('content') or data.get('input', ''))
if text:
documents.append(text)
results = chunk_documents(documents, strategy=ChunkingStrategy.SEMANTIC)
flattened_results = []
for doc_result in results:
for chunk in doc_result['chunks']:
flattened_results.append({
"hash_code": chunk["hash_code"],
"text": chunk["text"],
"tokens": chunk["tokens"],
"quality_score": chunk.get("quality_score", 0.0),
"strategy": chunk["strategy"]
})
with open(chunk_file, 'w', encoding='utf-8') as f:
json.dump(flattened_results, f, ensure_ascii=False, indent=2)
click.echo(f" ✅ Created {len(flattened_results)} chunks")
# Step 2: Extract triples (manual step notification)
click.echo("\n2️⃣ Step 2: Extract triples and entities (GraphRAG)")
click.echo(" ⚠️ Manual extraction step – run after configuring LLM env vars")
click.echo(f" 💡 Run: leanrag extract {chunk_file}")
click.echo(" 💡 Or manually configure and run the extraction scripts")
# Step 3: Build graph (would be manual)
click.echo("\n3️⃣ Step 3: Build knowledge graph")
click.echo(f" 💡 After extraction, run: leanrag build {work_dir}")
# Step 4: Query (if provided)
if query:
click.echo(f"\n4️⃣ Step 4: Query knowledge graph")
click.echo(" ⚠️ Requires completed graph building first")
click.echo(f" 💡 Run: leanrag query \"{query}\" {work_dir} --chunks-file {chunk_file}")
click.echo("\n🎉 Pipeline setup complete!")
click.echo(" 📋 Next steps:")
click.echo(" 1. Configure LLM endpoints in config files")
click.echo(" 2. Run extraction: leanrag extract ...")
click.echo(" 3. Build graph: leanrag build ...")
if query:
click.echo(f" 4. Query: leanrag query \"{query}\" ...")
except Exception as e:
click.echo(f"❌ Error in pipeline: {str(e)}", err=True)
raise click.Abort()
@cli.command()
def check():
"""Check system status and configuration."""
click.echo("🔍 Checking LeanRAG system status...")
# Check environment variables
env_status = {
'TOGETHER_API_KEY': bool(os.getenv('TOGETHER_API_KEY')),
'OPENAI_API_KEY': bool(os.getenv('OPENAI_API_KEY')),
'QDRANT_URL': bool(os.getenv('QDRANT_URL')),
'GRAPH_URI': bool(os.getenv('GRAPH_URI')),
'GRAPH_USER': bool(os.getenv('GRAPH_USER')),
'GRAPH_PASSWORD': bool(os.getenv('GRAPH_PASSWORD'))
}
click.echo("\n📋 Environment Variables:")
for var, status in env_status.items():
icon = "✅" if status else "❌"
click.echo(f" {icon} {var}: {'Set' if status else 'Missing'}")
# Check if databases are accessible
click.echo("\n🗄️ Database Connectivity:")
# Check Qdrant
try:
from qdrant_client import QdrantClient
qdrant_url = os.getenv('QDRANT_URL', 'http://localhost:6333')
client = QdrantClient(url=qdrant_url)
client.get_collections()
click.echo(" ✅ Qdrant: Connected")
except Exception as e:
click.echo(f" ❌ Qdrant: {str(e)}")
# Check Neo4j
try:
from neo4j import GraphDatabase
graph_uri = os.getenv('GRAPH_URI', 'bolt://localhost:7687')
graph_user = os.getenv('GRAPH_USER', 'neo4j')
graph_password = os.getenv('GRAPH_PASSWORD', 'test123456')
driver = GraphDatabase.driver(graph_uri, auth=(graph_user, graph_password))
with driver.session() as session:
session.run("RETURN 1")
click.echo(" ✅ Neo4j: Connected")
except Exception as e:
click.echo(f" ❌ Neo4j: {str(e)}")
# Check required files
click.echo("\n📁 Required Files:")
required_files = ['requirements.txt', '.env']
for file in required_files:
exists = os.path.exists(file)
icon = "✅" if exists else "❌"
click.echo(f" {icon} {file}: {'Found' if exists else 'Missing'}")
click.echo("\n💡 Tips:")
click.echo(" - Install missing dependencies: pip install -r requirements.txt")
click.echo(" - Set environment variables in .env file")
click.echo(" - Start databases: docker run -p 6333:6333 qdrant/qdrant")
click.echo(" - Start Neo4j: docker run -d --name leanrag-neo4j -p 7474:7474 -p 7687:7687 -e NEO4J_AUTH=neo4j/test123456 neo4j:5.23-community")
if __name__ == '__main__':
cli()