Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,8 @@ __pycache__
.env*
.venv/
logs/
pageindex.egg-info/
dist/
*.db
venv/
uv.lock
62 changes: 62 additions & 0 deletions examples/cloud_demo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
"""
Agentic Vectorless RAG with PageIndex SDK - Cloud Demo

Uses CloudClient for fully-managed document indexing and QA.
No LLM API key needed — the cloud service handles everything.

Steps:
1 — Upload and index a PDF via PageIndex cloud
2 — Stream a question with tool call visibility

Requirements:
pip install pageindex
export PAGEINDEX_API_KEY=your-api-key
"""
import asyncio
import os
from pathlib import Path
import requests
from pageindex import CloudClient

_EXAMPLES_DIR = Path(__file__).parent
PDF_URL = "https://arxiv.org/pdf/1706.03762.pdf"
PDF_PATH = _EXAMPLES_DIR / "documents" / "attention.pdf"

# Download PDF if needed
if not PDF_PATH.exists():
print(f"Downloading {PDF_URL} ...")
PDF_PATH.parent.mkdir(parents=True, exist_ok=True)
with requests.get(PDF_URL, stream=True, timeout=30) as r:
r.raise_for_status()
with open(PDF_PATH, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
print("Download complete.\n")

client = CloudClient(api_key=os.environ["PAGEINDEX_API_KEY"])
col = client.collection()

doc_id = col.add(str(PDF_PATH))
print(f"Indexed: {doc_id}\n")

# Streaming query
stream = col.query("What is the main contribution of this paper?", stream=True)

async def main():
streamed_text = False
async for event in stream:
if event.type == "answer_delta":
print(event.data, end="", flush=True)
streamed_text = True
elif event.type == "tool_call":
if streamed_text:
print()
streamed_text = False
args = event.data.get("args", "")
print(f"[tool call] {event.data['name']}({args})")
elif event.type == "answer_done":
print()
streamed_text = False

asyncio.run(main())
69 changes: 69 additions & 0 deletions examples/local_demo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""
Agentic Vectorless RAG with PageIndex SDK - Local Demo

A simple example of using LocalClient for self-hosted document indexing
and agent-based QA. The agent uses OpenAI Agents SDK to reason over
the document's tree structure index.

Steps:
1 — Download and index a PDF
2 — Stream a question with tool call visibility

Requirements:
pip install pageindex
export OPENAI_API_KEY=your-api-key # or any LiteLLM-supported provider
"""
import asyncio
from pathlib import Path
import requests
from pageindex import LocalClient

_EXAMPLES_DIR = Path(__file__).parent
PDF_URL = "https://arxiv.org/pdf/1706.03762.pdf"
PDF_PATH = _EXAMPLES_DIR / "documents" / "attention.pdf"
WORKSPACE = _EXAMPLES_DIR / "workspace"
MODEL = "gpt-4o-2024-11-20" # any LiteLLM-supported model

# Download PDF if needed
if not PDF_PATH.exists():
print(f"Downloading {PDF_URL} ...")
PDF_PATH.parent.mkdir(parents=True, exist_ok=True)
with requests.get(PDF_URL, stream=True, timeout=30) as r:
r.raise_for_status()
with open(PDF_PATH, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
print("Download complete.\n")

client = LocalClient(model=MODEL, storage_path=str(WORKSPACE))
col = client.collection()

doc_id = col.add(str(PDF_PATH))
print(f"Indexed: {doc_id}\n")

# Streaming query
stream = col.query(
"What is the main architecture proposed in this paper and how does self-attention work?",
stream=True,
)

async def main():
streamed_text = False
async for event in stream:
if event.type == "answer_delta":
print(event.data, end="", flush=True)
streamed_text = True
elif event.type == "tool_call":
if streamed_text:
print()
streamed_text = False
print(f"[tool call] {event.data['name']}")
elif event.type == "tool_result":
preview = str(event.data)[:200] + "..." if len(str(event.data)) > 200 else event.data
print(f"[tool output] {preview}")
elif event.type == "answer_done":
print()
streamed_text = False

asyncio.run(main())
40 changes: 39 additions & 1 deletion pageindex/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,42 @@
# pageindex/__init__.py
# Upstream exports (backward compatibility)
from .page_index import *
from .page_index_md import md_to_tree
from .retrieve import get_document, get_document_structure, get_page_content
from .client import PageIndexClient

# SDK exports
from .client import PageIndexClient, LocalClient, CloudClient
from .config import IndexConfig
from .collection import Collection
from .parser.protocol import ContentNode, ParsedDocument, DocumentParser
from .storage.protocol import StorageEngine
from .events import QueryEvent
from .errors import (
PageIndexError,
PageIndexAPIError,
CollectionNotFoundError,
DocumentNotFoundError,
IndexingError,
CloudAPIError,
FileTypeError,
)

__all__ = [
"PageIndexClient",
"LocalClient",
"CloudClient",
"IndexConfig",
"Collection",
"ContentNode",
"ParsedDocument",
"DocumentParser",
"StorageEngine",
"QueryEvent",
"PageIndexError",
"PageIndexAPIError",
"CollectionNotFoundError",
"DocumentNotFoundError",
"IndexingError",
"CloudAPIError",
"FileTypeError",
]
93 changes: 93 additions & 0 deletions pageindex/agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# pageindex/agent.py
from __future__ import annotations
from typing import AsyncIterator
from .events import QueryEvent
from .backend.protocol import AgentTools


SYSTEM_PROMPT = """
You are PageIndex, a document QA assistant.
TOOL USE:
- Call list_documents() to see available documents.
- Call get_document(doc_id) to confirm status and page/line count.
- Call get_document_structure(doc_id) to identify relevant page ranges.
- Call get_page_content(doc_id, pages="5-7") with tight ranges; never fetch the whole document.
- Before each tool call, output one short sentence explaining the reason.
IMAGES:
- Page content may contain image references like ![image](path). Always preserve these in your answer so the downstream UI can render them.
- Place images near the relevant context in your answer.
Answer based only on tool output. Be concise.
"""


class QueryStream:
"""Streaming query result, similar to OpenAI's RunResultStreaming.

Usage:
stream = col.query("question", stream=True)
async for event in stream:
if event.type == "answer_delta":
print(event.data, end="", flush=True)
"""

def __init__(self, tools: AgentTools, question: str, model: str = None):
from agents import Agent
from agents.model_settings import ModelSettings
self._agent = Agent(
name="PageIndex",
instructions=SYSTEM_PROMPT,
tools=tools.function_tools,
mcp_servers=tools.mcp_servers,
model=model,
model_settings=ModelSettings(parallel_tool_calls=False),
)
self._question = question

async def stream_events(self) -> AsyncIterator[QueryEvent]:
"""Async generator yielding QueryEvent as they arrive."""
from agents import Runner, ItemHelpers
from agents.stream_events import RawResponsesStreamEvent, RunItemStreamEvent
from openai.types.responses import ResponseTextDeltaEvent

streamed_run = Runner.run_streamed(self._agent, self._question)
async for event in streamed_run.stream_events():
if isinstance(event, RawResponsesStreamEvent):
if isinstance(event.data, ResponseTextDeltaEvent):
yield QueryEvent(type="answer_delta", data=event.data.delta)
elif isinstance(event, RunItemStreamEvent):
item = event.item
if item.type == "tool_call_item":
raw = item.raw_item
yield QueryEvent(type="tool_call", data={
"name": raw.name, "args": getattr(raw, "arguments", "{}"),
})
elif item.type == "tool_call_output_item":
yield QueryEvent(type="tool_result", data=str(item.output))
elif item.type == "message_output_item":
text = ItemHelpers.text_message_output(item)
if text:
yield QueryEvent(type="answer_done", data=text)

def __aiter__(self):
return self.stream_events()


class AgentRunner:
def __init__(self, tools: AgentTools, model: str = None):
self._tools = tools
self._model = model

def run(self, question: str) -> str:
"""Sync non-streaming query. Returns answer string."""
from agents import Agent, Runner
from agents.model_settings import ModelSettings
agent = Agent(
name="PageIndex",
instructions=SYSTEM_PROMPT,
tools=self._tools.function_tools,
mcp_servers=self._tools.mcp_servers,
model=self._model,
model_settings=ModelSettings(parallel_tool_calls=False),
)
result = Runner.run_sync(agent, question)
return result.final_output
Empty file added pageindex/backend/__init__.py
Empty file.
Loading
Loading