This document provides detailed API documentation for the core KohakuRAG library components.
Location: src/kohakurag/llm.py
Chat backend powered by OpenAI's Chat Completions API with automatic rate limit handling.
OpenAIChatModel(
*,
model: str = "gpt-4o-mini",
api_key: Optional[str] = None,
organization: Optional[str] = None,
system_prompt: str | None = None,
max_retries: int = 5,
base_retry_delay: float = 3.0,
base_url: Optional[str] = None,
max_concurrent: int = 10,
)Parameters:
model(str, default:"gpt-4o-mini"): OpenAI model identifierapi_key(Optional[str], default:None): OpenAI API key. IfNone, reads fromOPENAI_API_KEYenvironment variable or.envfileorganization(Optional[str], default:None): OpenAI organization IDsystem_prompt(str | None, default:None): Default system prompt for all completionsmax_retries(int, default:5): Maximum number of retry attempts on rate limit errorsbase_retry_delay(float, default:3.0): Base delay in seconds for exponential backoffbase_url(Optional[str], default:None): Optional override for the API base URL (for example,http://localhost:8000/v1for self-hosted vLLM/llama.cpp or an OpenAI-compatible proxy). If omitted, falls back to theOPENAI_BASE_URLenvironment variable or.envfile when present.max_concurrent(int, default:10): Maximum number of concurrent API requests. Set to 0 or negative to disable rate limiting (unlimited concurrency).
Raises:
ImportError: Ifopenai>=1.0.0is not installedValueError: If no API key is found
Execute a chat completion request with automatic rate limit retry (async).
Parameters:
prompt(str): User prompt to send to the modelsystem_prompt(str | None, optional): Override the default system prompt for this request
Returns:
str: Model's response content
Raises:
openai.RateLimitError: If rate limit persists after all retriesopenai.BadRequestError: For context overflow or other 400 errorsopenai.OpenAIError: For other API errors
Retry Behavior:
The method automatically handles rate limit errors using an intelligent retry strategy:
- Semaphore control: Limits concurrent requests via
asyncio.Semaphore(max_concurrent) - Server-recommended delays: Parses error messages for suggested wait times (e.g., "Please try again in 23ms")
- Exponential backoff: Falls back to 3s, 6s, 12s, 24s, 48s... by default if no specific delay is provided (scaled by
base_retry_delay) - Automatic retry: Continues until success or
max_retriesis exhausted
Example:
import asyncio
from kohakurag.llm import OpenAIChatModel
async def main():
# Basic usage
chat = OpenAIChatModel(model="gpt-4o-mini")
response = await chat.complete("Explain quantum computing in one sentence.")
# Configure concurrency and retry behavior
chat = OpenAIChatModel(
model="gpt-4o-mini",
max_concurrent=10, # Max 10 concurrent requests
max_retries=10, # More retries for TPM-constrained accounts
base_retry_delay=2.0, # Longer initial delay
)
response = await chat.complete("What is the capital of France?")
# Override system prompt per request
chat = OpenAIChatModel(
system_prompt="You are a helpful assistant."
)
response = await chat.complete(
"Explain RAG systems",
system_prompt="You are an expert in information retrieval."
)
# Batch processing with asyncio.gather()
questions = ["Q1", "Q2", "Q3"]
responses = await asyncio.gather(*[
chat.complete(q) for q in questions
])
asyncio.run(main())The retry mechanism is designed to work seamlessly with OpenAI's rate limits:
Supported Error Formats:
"Please try again in 23ms"→ waits 0.023s + 0.1s buffer"Please try again in 1.5s"→ waits 1.5s + 0.1s buffer"Please try again in 2m"→ waits 120s + 0.1s buffer
Exponential Backoff Schedule:
| Attempt | Wait Time (seconds) |
|---|---|
| 1 | 3.0 |
| 2 | 6.0 |
| 3 | 12.0 |
| 4 | 24.0 |
| 5 | 48.0 |
Console Output Example:
Rate limit hit (attempt 1/6). Waiting 0.12s before retry...
Rate limit hit (attempt 2/6). Waiting 2.00s before retry...
Location: src/kohakurag/embeddings.py
Sentence embedding model using jinaai/jina-embeddings-v3 from Hugging Face.
JinaEmbeddingModel(
model_name: str = "jinaai/jina-embeddings-v3",
device: str | None = None,
trust_remote_code: bool = True,
)Parameters:
model_name(str, default:"jinaai/jina-embeddings-v3"): Hugging Face model identifierdevice(str | None, default:None): Device to use ("cuda","mps","cpu"). Auto-detected ifNonetrust_remote_code(bool, default:True): Allow remote code execution (required for Jina models)
Returns the embedding dimension (1024 for Jina v3).
Generate embeddings for a batch of texts (async).
Parameters:
texts(Sequence[str]): List of text strings to embed
Returns:
numpy.ndarray: Array of shape(len(texts), dimension)with float32 dtype
Example:
import asyncio
from kohakurag.embeddings import JinaEmbeddingModel
async def main():
embedder = JinaEmbeddingModel()
# Single text
embedding = await embedder.embed(["Hello, world!"])
print(embedding.shape) # (1, 1024)
# Batch embedding
texts = [
"This is the first sentence.",
"This is the second sentence.",
"And a third one for good measure."
]
embeddings = await embedder.embed(texts)
print(embeddings.shape) # (3, 1024)
asyncio.run(main())Performance Notes:
- First call downloads ~2GB model from Hugging Face
- Automatically uses FP16 on CUDA/MPS for 2x speedup
- Batch processing is more efficient than individual calls
- Thread-safe via single-worker
ThreadPoolExecutor(no manual locking needed)
Location: src/kohakurag/datastore.py
SQLite-backed hierarchical vector store using KohakuVault.
KVaultNodeStore(
db_path: str | Path,
table_prefix: str = "nodes",
dimensions: int | None = None,
)Parameters:
db_path(str | Path): Path to SQLite database file (created if doesn't exist)table_prefix(str, default:"nodes"): Prefix for KohakuVault tablesdimensions(int | None, default:None): Embedding dimension (auto-detected from first insert ifNone)
Insert or update nodes in the datastore (async).
Parameters:
nodes(Sequence[StoredNode]): List of nodes to upsert
async search(query_vector: np.ndarray, k: int = 5, kinds: set[NodeKind] | None = None) -> list[RetrievalMatch]
Search for nearest neighbors (async).
Parameters:
query_vector(np.ndarray): Query embedding vectork(int, default: 5): Number of results to returnkinds(set[NodeKind] | None): Filter by node types
Returns:
list[RetrievalMatch]: List of matches with nodes and scores
Retrieve a node by ID (async).
Parameters:
node_id(str): Node identifier
Returns:
StoredNode: Node object
Raises:
KeyError: If node not found
Example:
import asyncio
from kohakurag.datastore import KVaultNodeStore
from kohakurag.embeddings import JinaEmbeddingModel
async def main():
# Create datastore
store = KVaultNodeStore(
db_path="artifacts/my_index.db",
table_prefix="docs",
dimensions=1024,
)
# Create embeddings
embedder = JinaEmbeddingModel()
query_embedding = (await embedder.embed(["How does RAG work?"]))[0]
# Search
results = await store.search(query_embedding, k=5)
for match in results:
print(f"[{match.score:.3f}] {match.node.text[:100]}...")
asyncio.run(main())Location: src/kohakurag/pipeline.py
End-to-end RAG orchestration with query planning, retrieval, and answer generation.
RAGPipeline(
store: HierarchicalVectorStore,
embedder: EmbeddingModel,
chat_model: ChatModel,
planner: QueryPlanner | None = None,
)Parameters:
store(HierarchicalVectorStore): Datastore for retrievalembedder(EmbeddingModel): Embedding model for querieschat_model(ChatModel): LLM for answer generationplanner(QueryPlanner | None, optional): Query expansion planner
Bulk insert pre-built nodes into the store (async).
Execute multi-query retrieval with hierarchical context expansion (async).
Simple QA: retrieve + prompt + generate (async).
Execute a complete question-answering pipeline (async).
Parameters:
question(str): User questionsystem_prompt(str): System prompt for the LLMuser_template(str): Template for formatting context + questionadditional_info(dict[str, Any], optional): Extra metadata for the prompttop_k(int, default: 5): Number of snippets to retrieve
Returns:
StructuredAnswerResult: Object containing:answer: Structured answer objectraw_response: Raw LLM outputprompt: Final prompt sent to LLMretrieval: Retrieval result with matches and snippets
Example:
import asyncio
from kohakurag import RAGPipeline
from kohakurag.datastore import KVaultNodeStore
from kohakurag.embeddings import JinaEmbeddingModel
from kohakurag.llm import OpenAIChatModel
async def main():
# Initialize components
store = KVaultNodeStore("artifacts/index.db")
embedder = JinaEmbeddingModel()
chat = OpenAIChatModel(model="gpt-4o-mini", max_concurrent=10, max_retries=5)
# Create pipeline
pipeline = RAGPipeline(
store=store,
embedder=embedder,
chat_model=chat,
)
# Run Q&A
result = await pipeline.run_qa(
question="What is the water consumption of GPT-3 training?",
system_prompt="Answer based only on the provided context.",
user_template="Question: {question}\n\nContext:\n{context}\n\nAnswer:",
top_k=6,
)
print(result.answer.answer_value)
print(result.answer.explanation)
# Batch processing with asyncio.gather()
questions = ["Q1", "Q2", "Q3"]
results = await asyncio.gather(*[
pipeline.run_qa(
question=q,
system_prompt="Answer based on context.",
user_template="Q: {question}\nContext: {context}\nA:",
)
for q in questions
])
asyncio.run(main())Location: src/kohakurag/pdf_utils.py
Extract structured payload from PDF files.
Signature:
def pdf_to_document_payload(
pdf_path: str | Path,
metadata: dict[str, Any],
) -> DocumentPayloadParameters:
pdf_path(str | Path): Path to PDF filemetadata(dict[str, Any]): Document metadata (title, author, URL, etc.)
Returns:
DocumentPayload: Structured document with sections, paragraphs, and sentences
Example:
from kohakurag.pdf_utils import pdf_to_document_payload
payload = pdf_to_document_payload(
pdf_path="papers/attention_is_all_you_need.pdf",
metadata={
"id": "vaswani2017",
"title": "Attention Is All You Need",
"authors": "Vaswani et al.",
"year": 2017,
}
)
print(f"Pages: {len(payload.sections)}")
print(f"First paragraph: {payload.sections[0].paragraphs[0].text[:100]}...")Location: src/kohakurag/parsers.py
Parse Markdown files with heading-based structure.
Signature:
def markdown_to_payload(
markdown_text: str,
metadata: dict[str, Any],
) -> DocumentPayloadLocation: src/kohakurag/parsers.py
Convert plain text to structured payload with heuristic segmentation.
Signature:
def text_to_payload(
text: str,
metadata: dict[str, Any],
) -> DocumentPayloadLocation: src/kohakurag/indexer.py
Build hierarchical tree and compute embeddings for documents.
DocumentIndexer(
embedder: EmbeddingModel,
store: HierarchicalVectorStore,
)Parameters:
embedder(EmbeddingModel): Model for generating embeddingsstore(HierarchicalVectorStore): Datastore for persistence
Index a single document payload (async).
Parameters:
payload(DocumentPayload): Structured document to index
Returns:
list[StoredNode]: List of storable nodes with embeddings
Example:
import asyncio
from kohakurag.indexer import DocumentIndexer
from kohakurag.datastore import KVaultNodeStore
from kohakurag.embeddings import JinaEmbeddingModel
from kohakurag.pdf_utils import pdf_to_document_payload
async def main():
# Setup
embedder = JinaEmbeddingModel()
store = KVaultNodeStore("artifacts/index.db", dimensions=1024)
indexer = DocumentIndexer(embedder)
# Index a document
payload = pdf_to_document_payload(
pdf_path="papers/bert.pdf",
doc_id="bert2018",
title="BERT",
metadata={"year": 2018},
)
nodes = await indexer.index(payload)
await store.upsert_nodes(nodes)
asyncio.run(main())from kohakurag.llm import OpenAIChatModel
import openai
chat = OpenAIChatModel(max_retries=3)
try:
response = chat.complete("Hello!")
except openai.RateLimitError as e:
print(f"Rate limit exceeded after all retries: {e}")from kohakurag.llm import OpenAIChatModel
try:
chat = OpenAIChatModel()
except ValueError as e:
print(f"API key not found: {e}")All I/O operations in KohakuRAG are async:
JinaEmbeddingModel: Thread-safe via single-workerThreadPoolExecutorKVaultNodeStore: Thread-safe via single-workerThreadPoolExecutorfor SQLite operationsOpenAIChatModel: Async with semaphore-based concurrency control
import asyncio
from kohakurag import RAGPipeline
from kohakurag.datastore import KVaultNodeStore
from kohakurag.embeddings import JinaEmbeddingModel
from kohakurag.llm import OpenAIChatModel
async def main():
# Create shared pipeline (all components are async-safe)
store = KVaultNodeStore("artifacts/index.db")
embedder = JinaEmbeddingModel()
chat = OpenAIChatModel(max_concurrent=10, max_retries=5)
pipeline = RAGPipeline(store=store, embedder=embedder, chat_model=chat)
# Concurrent batch processing
questions = ["Q1", "Q2", "Q3", ...]
results = await asyncio.gather(*[
pipeline.run_qa(
question=q,
system_prompt="You are a helpful assistant.",
user_template="Question: {question}\nContext: {context}\nAnswer:",
)
for q in questions
])
for result in results:
print(result.answer.answer_value)
asyncio.run(main())| Variable | Description | Default |
|---|---|---|
OPENAI_API_KEY |
OpenAI API key | Required |
HF_HOME |
Hugging Face cache directory | ~/.cache/huggingface |
CUDA_VISIBLE_DEVICES |
GPU devices to use | All available |
-
Start with conservative settings:
chat = OpenAIChatModel( max_concurrent=5, # Limit concurrent requests max_retries=10, base_retry_delay=2.0, )
-
Adjust concurrency when hitting limits:
# In your config file (e.g., configs/text_only/answer.py) max_concurrent = 5 # Reduce from default 10
-
Disable rate limiting for self-hosted endpoints:
# In your config file max_concurrent = 0 # Unlimited
-
Monitor retry messages in logs:
Rate limit hit (attempt 1/11). Waiting 0.12s before retry...
-
Use GPU when available:
embedder = JinaEmbeddingModel(device="cuda")
-
Batch embed for efficiency:
# Good: batch embedding embeddings = await embedder.embed(all_texts) # Bad: individual calls in sequence embeddings = [await embedder.embed([text])[0] for text in all_texts] # Better: concurrent individual calls (if needed) embeddings = await asyncio.gather(*[ embedder.embed([text]) for text in all_texts ])
-
Use consistent table prefixes:
store = KVaultNodeStore("index.db", table_prefix="v2") # Isolate versions
-
Backup before re-indexing:
cp artifacts/wattbot.db artifacts/wattbot_backup.db
For more examples, see the Usage Guide and WattBot Playbook.