diff --git a/.env.example b/.env.example index 8447f2f..450c22e 100644 --- a/.env.example +++ b/.env.example @@ -3,7 +3,7 @@ OPENAI_API_KEY= OPENAI_BASE_URL= OPENAI_MODEL=gpt-4.1-mini DEEPSEEK_API_KEY= -DEEPSEEK_BASE_URL=https://api.deepseek.com/v1 +DEEPSEEK_BASE_URL=https://api.deepseek.com OLLAMA_BASE_URL=http://localhost:11434/v1 LLM_TIMEOUT_S=60 MODEL_MAX_TOKENS=128000 @@ -58,11 +58,6 @@ CORS_ORIGINS=* # ── Docker host port overrides (optional) ─────────────────────────────────── # Only needed when local ports such as 8000 or 6379 are already occupied. -JETBOT_API_PORT=8000 -JETBOT_REDIS_PORT=6379 -JETBOT_POSTGRES_PORT=5432 -JETBOT_MINIO_PORT=9000 -JETBOT_MINIO_CONSOLE_PORT=9001 # ── Rate limiting (requests per minute per IP) ──────────────────────────────── RATE_LIMIT_UPLOAD=5 diff --git a/.gitignore b/.gitignore index 138ee1b..b02013e 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ __pycache__/ .mypy_cache/ *.egg-info/ .env +.env.example .venv/ .vscode/ .idea/ diff --git a/README.md b/README.md index 411e962..ab0d696 100644 --- a/README.md +++ b/README.md @@ -1,194 +1,195 @@ -# Jetbot - -Jetbot is a financial report analysis platform that turns PDF filings into structured financial statements, key notes, risk signals, event-study outputs, and trader-style summaries. It combines PDF extraction, validation, LLM orchestration, a FastAPI backend, and a Vue dashboard in one repository. - -It is designed for teams that need a single workflow to ingest reports, inspect extracted evidence, and ship the results through an API, a CLI, or a browser UI. - -## Highlights - -- End-to-end PDF pipeline for raw text, tables, statements, notes, and report generation. -- Works in mock mode out of the box, with optional OpenAI and Anthropic model routing. -- Vue 3 dashboard for reviewing original PDFs alongside extraction and analysis outputs. -- Docker-first local stack with API, worker, Redis, PostgreSQL, and MinIO. -- Pluggable storage, retrieval, tracing, and market-data integrations. -- Production-friendly defaults for auth, rate limiting, metrics, and tracing. - -## Architecture - -```mermaid -flowchart LR - A[Financial PDF] --> B[PDF extraction and OCR] - B --> C[Normalization and validation] - C --> D[LLM enrichment and report generation] - C --> E[Risk signals and event study] - D --> F[FastAPI and CLI] - E --> F - F --> G[Vue dashboard at /ui] -``` - -## Quick Start - -### Option 1: Local development - -Use this path when you want the fastest edit-run loop. - -```bash -python -m venv .venv -# activate the virtual environment for your shell -pip install -e . -make dev -``` - -The API starts at `http://127.0.0.1:8000`. - -If you want the Vue frontend in dev mode as well: - -```bash -make web-install -make web-dev -``` - -The Vite app runs at `http://127.0.0.1:5173` and proxies API requests to the local backend. - -### Option 2: Full Docker stack - -Use this path when you want the full local system with background worker and infrastructure services. - -```bash -copy .env.example .env -make docker-up -``` - -`make docker-up` now does four things in one flow: - -- builds the backend image with the Vue production bundle included -- starts the API, worker, Redis, PostgreSQL, and MinIO services -- waits until the API health endpoint is ready -- opens the frontend automatically at `http://127.0.0.1:8000/ui/` - -Set `JETBOT_OPEN_BROWSER=0` if you want to skip the automatic browser launch. - -Stop the stack with: - -```bash -make docker-down -``` - -## What You Get - -After startup, the main entry points are: - -| Surface | URL / Command | Notes | -| --- | --- | --- | -| Web UI | `http://127.0.0.1:8000/ui/` | Review uploaded PDFs, tables, statements, signals, and generated reports | -| API | `http://127.0.0.1:8000/v1` | Programmatic ingestion and retrieval | -| OpenAPI docs | `http://127.0.0.1:8000/docs` | Interactive API explorer | -| Health | `http://127.0.0.1:8000/health` | Liveness probe | -| Metrics | `http://127.0.0.1:8000/metrics` | Prometheus endpoint | -| CLI | `python -m src.cli --help` | Local automation and scripting | - -## Common Workflows - -### Analyze a PDF from the CLI - -```bash -python -m src.cli analyze --pdf path/to/report.pdf --out data --company "Example Co" --period-end 2025-12-31 -``` - -### Run the bundled real-PDF example - -```bash -python examples/real_pdf_analysis/run_example.py -``` - -### Call the API directly - -```bash -curl -F "file=@path/to/report.pdf" \ - -H "X-API-Key: your-key" \ - http://127.0.0.1:8000/v1/documents - -curl -X POST \ - -H "X-API-Key: your-key" \ - http://127.0.0.1:8000/v1/documents//analyze -``` - -## Configuration - -Jetbot starts in mock mode if no provider key is configured. Most teams only need a small set of environment variables to get productive: - -| Variable | Purpose | Default | -| --- | --- | --- | -| `OPENAI_API_KEY` | Enable OpenAI-backed extraction and reporting | empty | -| `ANTHROPIC_API_KEY` | Enable Anthropic-backed models | empty | -| `LLM_DEFAULT_MODEL` | Default router target in `provider:model` format | empty | -| `LLM_EXTRACTION_MODEL` | Override the extraction model | empty | -| `LLM_REPORT_MODEL` | Override the reporting model | empty | -| `RAG_MODE` | Retrieval mode: `token_overlap`, `embedding`, `hybrid` | `token_overlap` | -| `TASK_BACKEND` | `background` or `celery` | `background` | -| `STORAGE_BACKEND` | `local` or `postgres` | `local` | -| `API_KEYS` | Comma-separated API keys; blank disables auth | empty | -| `JETBOT_API_PORT` | Host port for the Dockerized API/UI | `8000` | - -See `.env.example` for the full configuration surface, including tracing, storage, port overrides, rate limiting, and market-data settings. - -## Optional Capability Packs - -Install only the packages you need: - -```bash -pip install -e ".[embeddings]" -pip install -e ".[anthropic]" -pip install -e ".[celery]" -pip install -e ".[postgres]" -pip install -e ".[s3]" -pip install -e ".[market]" -pip install -e ".[monitoring]" -pip install -e ".[all]" -``` - -## Development - -```bash -make test -make eval -make fmt -make lint -make typecheck -make web-lint -make web-build -``` - -The repository is organized around a small number of clear surfaces: - -- `src/api/` for HTTP entry points and application wiring -- `src/pdf/` for extraction, rendering, tables, and OCR -- `src/finance/` for schemas, normalization, validation, and signal logic -- `src/agent/` for pipeline orchestration and state handling -- `src/market/` for event-study analysis and market providers -- `web/` for the Vue 3 dashboard -- `tests/` for API, storage, pipeline, frontend-adjacent, and integration coverage -- `docs/` for architecture, branch protection, and project notes - -## Contributing - -All changes land through pull requests. - -```bash -git checkout -b feat/ -bash scripts/local_ci.sh -git push -u origin HEAD -gh pr create --base main --fill -``` - -Before opening a PR, make sure local CI passes. The script covers Python linting, typing, tests, and the web checks that mirror CI. - -See `CONTRIBUTING.md`, `CODE_OF_CONDUCT.md`, `SECURITY.md`, and `docs/BRANCH_PROTECTION.md` for project policy and contribution details. - -## License - -MIT. See `LICENSE`. - -## Not Financial Advice - +# Jetbot + +Jetbot is a financial report analysis platform that turns PDF filings into structured financial statements, key notes, risk signals, event-study outputs, and trader-style summaries. It combines PDF extraction, validation, LLM orchestration, a FastAPI backend, and a Vue dashboard in one repository. + +It is designed for teams that need a single workflow to ingest reports, inspect extracted evidence, and ship the results through an API, a CLI, or a browser UI. + +## Highlights + +- End-to-end PDF pipeline for raw text, tables, statements, notes, and report generation. +- Works in mock mode out of the box, with optional OpenAI and Anthropic model routing. +- Vue 3 dashboard for reviewing original PDFs alongside extraction and analysis outputs. +- Docker-first local stack with API, worker, Redis, PostgreSQL, and MinIO. +- Pluggable storage, retrieval, tracing, and market-data integrations. +- Production-friendly defaults for auth, rate limiting, metrics, and tracing. + +## Architecture + +```mermaid +flowchart LR + A[Financial PDF] --> B[PDF extraction and OCR] + B --> C[Normalization and validation] + C --> D[LLM enrichment and report generation] + C --> E[Risk signals and event study] + D --> F[FastAPI and CLI] + E --> F + F --> G[Vue dashboard at /ui] +``` + +## Quick Start + +### Option 1: Local development + +Use this path when you want the fastest edit-run loop. + +```bash +python -m venv .venv +# activate the virtual environment for your shell +pip install -e . +make dev +``` + +The API starts at `http://127.0.0.1:8000`. + +If you want the Vue frontend in dev mode as well: + +```bash +make web-install +make web-dev +``` + +The Vite app runs at `http://127.0.0.1:5173` and proxies API requests to the local backend. + +### Option 2: Full Docker stack + +Use this path when you want the full local system with background worker and infrastructure services. + +```bash +copy .env.example .env +make docker-up +``` + +`make docker-up` now does four things in one flow: + +- builds the backend image with the Vue production bundle included +- starts the API, worker, Redis, PostgreSQL, and MinIO services +- waits until the API health endpoint is ready +- opens the frontend automatically at `http://127.0.0.1:18000/ui/` + +Set `JETBOT_OPEN_BROWSER=0` if you want to skip the automatic browser launch. + +Stop the stack with: + +```bash +make docker-down +``` + +## What You Get + +After startup, the main entry points are: + +| Surface | URL / Command | Notes | +| --- | --- | --- | +| Web UI | `http://127.0.0.1:18000/ui/` | Review uploaded PDFs, tables, statements, signals, and generated reports | +| API | `http://127.0.0.1:18000/v1` | Programmatic ingestion and retrieval | +| OpenAPI docs | `http://127.0.0.1:18000/docs` | Interactive API explorer | +| Health | `http://127.0.0.1:18000/health` | Liveness probe | +| Metrics | `http://127.0.0.1:18000/metrics` | Prometheus endpoint | +| CLI | `python -m src.cli --help` | Local automation and scripting | + +## Common Workflows + +### Analyze a PDF from the CLI + +```bash +python -m src.cli analyze --pdf path/to/report.pdf --out data --company "Example Co" --period-end 2025-12-31 +``` + +### Run the bundled real-PDF example + +```bash +python examples/real_pdf_analysis/run_example.py +``` + +### Call the API directly + +```bash +curl -F "file=@path/to/report.pdf" \ + -H "X-API-Key: your-key" \ + http://127.0.0.1:18000/v1/documents + +curl -X POST \ + -H "X-API-Key: your-key" \ + http://127.0.0.1:18000/v1/documents//analyze +``` + +## Configuration + +Jetbot starts in mock mode if no provider key is configured. Most teams only need a small set of environment variables to get productive: + +| Variable | Purpose | Default | +| --- | --- | --- | +| `OPENAI_API_KEY` | Enable OpenAI-backed extraction and reporting | empty | +| `ANTHROPIC_API_KEY` | Enable Anthropic-backed models | empty | +| `LLM_DEFAULT_MODEL` | Default router target in `provider:model` format | empty | +| `LLM_EXTRACTION_MODEL` | Override the extraction model | empty | +| `LLM_REPORT_MODEL` | Override the reporting model | empty | +| `RAG_MODE` | Retrieval mode: `token_overlap`, `embedding`, `hybrid` | `token_overlap` | +| `TASK_BACKEND` | `background` or `celery` | `background` | +| `STORAGE_BACKEND` | `local` or `postgres` | `local` | +| `API_KEYS` | Comma-separated API keys; blank disables auth | empty | + +Docker host ports are fixed to `18000` (API/UI), `16379` (Redis), `15432` (PostgreSQL), `19000` (MinIO), and `19001` (MinIO Console), so app-level `.env` settings cannot remap them accidentally. + +See `.env.example` for the full configuration surface, including tracing, storage, rate limiting, and market-data settings. + +## Optional Capability Packs + +Install only the packages you need: + +```bash +pip install -e ".[embeddings]" +pip install -e ".[anthropic]" +pip install -e ".[celery]" +pip install -e ".[postgres]" +pip install -e ".[s3]" +pip install -e ".[market]" +pip install -e ".[monitoring]" +pip install -e ".[all]" +``` + +## Development + +```bash +make test +make eval +make fmt +make lint +make typecheck +make web-lint +make web-build +``` + +The repository is organized around a small number of clear surfaces: + +- `src/api/` for HTTP entry points and application wiring +- `src/pdf/` for extraction, rendering, tables, and OCR +- `src/finance/` for schemas, normalization, validation, and signal logic +- `src/agent/` for pipeline orchestration and state handling +- `src/market/` for event-study analysis and market providers +- `web/` for the Vue 3 dashboard +- `tests/` for API, storage, pipeline, frontend-adjacent, and integration coverage +- `docs/` for architecture, branch protection, and project notes + +## Contributing + +All changes land through pull requests. + +```bash +git checkout -b feat/ +bash scripts/local_ci.sh +git push -u origin HEAD +gh pr create --base main --fill +``` + +Before opening a PR, make sure local CI passes. The script covers Python linting, typing, tests, and the web checks that mirror CI. + +See `CONTRIBUTING.md`, `CODE_OF_CONDUCT.md`, `SECURITY.md`, and `docs/BRANCH_PROTECTION.md` for project policy and contribution details. + +## License + +MIT. See `LICENSE`. + +## Not Financial Advice + Jetbot produces structured extraction and analytical signals. It does not provide investment advice or recommend trades. \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 6f20a23..96c5579 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -7,7 +7,7 @@ services: args: EXTRAS: "celery,postgres,s3" ports: - - "${JETBOT_API_PORT:-8000}:8000" + - "18000:8000" env_file: - path: .env required: false @@ -47,7 +47,7 @@ services: redis: image: redis:7-alpine ports: - - "${JETBOT_REDIS_PORT:-6379}:6379" + - "16379:6379" postgres: image: postgres:16-alpine @@ -56,7 +56,7 @@ services: POSTGRES_PASSWORD: jetbot POSTGRES_DB: jetbot ports: - - "${JETBOT_POSTGRES_PORT:-5432}:5432" + - "15432:5432" volumes: - pg-data:/var/lib/postgresql/data @@ -67,8 +67,8 @@ services: MINIO_ROOT_USER: minio MINIO_ROOT_PASSWORD: minio123 ports: - - "${JETBOT_MINIO_PORT:-9000}:9000" - - "${JETBOT_MINIO_CONSOLE_PORT:-9001}:9001" + - "19000:9000" + - "19001:9001" volumes: - minio-data:/data diff --git a/scripts/open_ui_after_docker.py b/scripts/open_ui_after_docker.py index 129ab72..87c122b 100644 --- a/scripts/open_ui_after_docker.py +++ b/scripts/open_ui_after_docker.py @@ -17,9 +17,8 @@ def _env_flag(name: str, default: bool = True) -> bool: def main() -> int: - port = os.getenv("JETBOT_API_PORT", "8000") - health_url = os.getenv("JETBOT_HEALTHCHECK_URL", f"http://127.0.0.1:{port}/health") - ui_url = os.getenv("JETBOT_UI_URL", f"http://127.0.0.1:{port}/ui/") + health_url = os.getenv("JETBOT_HEALTHCHECK_URL", "http://127.0.0.1:18000/health") + ui_url = os.getenv("JETBOT_UI_URL", "http://127.0.0.1:18000/ui/") timeout_seconds = float(os.getenv("JETBOT_UI_OPEN_TIMEOUT", "90")) open_browser = _env_flag("JETBOT_OPEN_BROWSER", default=True) diff --git a/src/agent/nodes.py b/src/agent/nodes.py index a44fccb..02bb0d5 100644 --- a/src/agent/nodes.py +++ b/src/agent/nodes.py @@ -34,6 +34,7 @@ ) from src.storage.backend import StorageBackend, get_storage_backend from src.storage.vector_index import build_rag_index +from src.utils.document_metadata import enrich_document_meta from src.utils.ids import new_doc_id from src.utils.logging import get_logger, log_node from src.utils.time import monotonic_ms @@ -551,6 +552,7 @@ def finalize(state: AgentState) -> AgentState: state.debug.pop("_rag_index", None) store = get_storage_backend(state.data_dir or "data") + state.doc_meta = enrich_document_meta(state.doc_meta, state.pages) store.save_meta(state.doc_meta.doc_id, state.doc_meta) store.save_json(state.doc_meta.doc_id, "extracted/pages.json", [p.model_dump() for p in state.pages]) store.save_json(state.doc_meta.doc_id, "extracted/tables.json", [t.model_dump() for t in state.tables]) diff --git a/src/api/routes.py b/src/api/routes.py index 7fe176a..87bc64a 100644 --- a/src/api/routes.py +++ b/src/api/routes.py @@ -14,6 +14,7 @@ from src.agent.graph import build_graph, get_cached_state from src.agent.state import AgentState from src.api.auth import verify_api_key +from src.pdf.engine import get_pdf_engine from src.pdf.operations import ( delete_pages, extract_pages, @@ -24,6 +25,7 @@ from src.schemas.models import DocumentMeta from src.storage.backend import get_storage_backend from src.storage.task_store import TaskStore +from src.utils.document_metadata import enrich_document_meta from src.utils.ids import new_doc_id from src.utils.logging import get_logger @@ -118,12 +120,24 @@ def _validate_pdf_bytes(file: UploadFile, first_bytes: bytes) -> None: ) +def _load_enriched_meta(doc_id: str) -> DocumentMeta | None: + meta = store.load_meta(doc_id) + if meta is None: + return None + pages = store.load_json(doc_id, "extracted/pages.json") or [] + enriched = enrich_document_meta(meta, pages) + if enriched != meta: + store.save_meta(doc_id, enriched) + return enriched + + _AuthDep = Annotated[None, Depends(verify_api_key)] @router.post("/documents") async def create_document( _auth: _AuthDep, + background_tasks: BackgroundTasks, file: UploadFile = File(...), company: str | None = Form(default=None), period_end: str | None = Form(default=None), @@ -166,7 +180,7 @@ async def create_document( ) store.save_meta(doc_id, meta) task_store.create(doc_id) - return _ok({"doc_id": doc_id, "status": "queued"}) + return _ok(_start_analysis(meta, raw_path, background_tasks)) @router.get("/agent/capabilities") @@ -187,21 +201,41 @@ async def analyze_document( if not pdf_path.exists(): return _err("not_found", "PDF not found") + return _ok(_start_analysis(meta, pdf_path, background_tasks)) + + +def _start_analysis(meta: DocumentMeta, pdf_path: Path, background_tasks: BackgroundTasks) -> dict[str, Any]: + current = task_store.get(meta.doc_id) + if current is None: + current = task_store.create(meta.doc_id) + elif current["status"] == "running": + return current + from src.tasks import is_celery_backend - if is_celery_backend(): - from src.tasks.analysis import run_analysis + try: + if is_celery_backend(): + from src.tasks.analysis import run_analysis - run_analysis.delay(doc_id, str(pdf_path), meta.model_dump(mode="json")) - else: - background_tasks.add_task(_run_analysis, meta, str(pdf_path)) - task_store.update(doc_id, status="running", progress=5) - return _ok(task_store.get(doc_id)) + run_analysis.delay(meta.doc_id, str(pdf_path), meta.model_dump(mode="json")) + else: + background_tasks.add_task(_run_analysis, meta, str(pdf_path)) + except Exception as exc: + task_store.update(meta.doc_id, status="failed", progress=100, current_node=None, error_message=str(exc)) + raise HTTPException( + status_code=503, + detail={ + "ok": False, + "data": None, + "error": {"code": "analysis_start_failed", "message": f"Unable to start analysis: {exc}"}, + }, + ) from exc + return task_store.update(meta.doc_id, status="running", progress=5, current_node=None, error_message=None) @router.get("/documents/{doc_id}") async def get_document(_auth: _AuthDep, doc_id: str): - meta = store.load_meta(doc_id) + meta = _load_enriched_meta(doc_id) task = task_store.get(doc_id) if not meta or not task: return _err("not_found", "Document not found") @@ -277,6 +311,7 @@ async def list_documents( items: list[dict[str, Any]] = [] for meta in store.list_metas(): + meta = _load_enriched_meta(meta.doc_id) or meta task = task_store.get(meta.doc_id) items.append( { @@ -320,9 +355,36 @@ async def get_pages(_auth: _AuthDep, doc_id: str): return _ok(data) +@router.get("/documents/{doc_id}/pages/{page_number}/image") +async def get_page_image(_auth: _AuthDep, doc_id: str, page_number: int): + meta = store.load_meta(doc_id) + if meta is None: + return _err("not_found", "Document not found") + if page_number < 1: + return _err("bad_request", "page_number must be >= 1") + pdf_path = store.doc_dir(doc_id) / "raw.pdf" + if not pdf_path.exists(): + return _err("not_found", "PDF not found") + preview_dir = store.ensure_layout(doc_id)["pages"] / "pdfium_preview" + try: + image_path = get_pdf_engine("pdfium").render_page(str(pdf_path), page_number, str(preview_dir), dpi=144) + except ValueError as exc: + return _err("bad_request", str(exc)) + except RuntimeError as exc: + raise HTTPException( + status_code=500, + detail={ + "ok": False, + "data": None, + "error": {"code": "pdfium_unavailable", "message": str(exc)}, + }, + ) from exc + return FileResponse(image_path, media_type="image/png", headers={"X-PDF-Engine": "pdfium"}) + + @router.get("/documents/{doc_id}/pdf") async def get_pdf(_auth: _AuthDep, doc_id: str): - """Stream the original uploaded PDF so the SPA can preview it in an iframe.""" + """Stream the original uploaded PDF for direct links and downloads.""" meta = store.load_meta(doc_id) if meta is None: return _err("not_found", "Document not found") diff --git a/src/llm/base.py b/src/llm/base.py index 6bdc31e..d066db4 100644 --- a/src/llm/base.py +++ b/src/llm/base.py @@ -86,7 +86,7 @@ def _build_client(provider: str, model: str) -> LLMClient: else: from src.llm.openai_client import OpenAILLMClient - client = OpenAILLMClient(api_key=api_key or "ollama", model=model, base_url=base_url) + client = OpenAILLMClient(api_key=api_key or "ollama", model=model, base_url=base_url, provider=provider) else: # "mock" or unknown provider client = MockLLMClient() diff --git a/src/llm/openai_client.py b/src/llm/openai_client.py index ce97f4a..fd52126 100644 --- a/src/llm/openai_client.py +++ b/src/llm/openai_client.py @@ -32,13 +32,14 @@ class OpenAILLMClient: - def __init__(self, api_key: str, model: str, *, base_url: str | None = None) -> None: + def __init__(self, api_key: str, model: str, *, base_url: str | None = None, provider: str = "openai") -> None: _timeout = int(os.getenv("LLM_TIMEOUT_S", "60")) client_kwargs: dict[str, Any] = {"api_key": api_key, "timeout": float(_timeout)} if base_url: client_kwargs["base_url"] = base_url self._client = OpenAI(**client_kwargs) self._model = model + self._provider = provider.lower() self._chat_model = None if LANGCHAIN_AVAILABLE and ChatOpenAI is not None: chat_kwargs: dict[str, Any] = { @@ -63,6 +64,9 @@ async def chat(self, system: str, user: str, json_schema: dict | None = None) -> chain = prompt | self._chat_model message = await chain.ainvoke({"system_message": system, "user_message": user}) return _message_to_text(message) + if json_schema and not self._supports_native_structured_output(): + user = _with_json_instructions(user, json_schema) + json_schema = None return await asyncio.to_thread(self._chat_sync, system, user, json_schema) def _chat_sync(self, system: str, user: str, json_schema: dict | None) -> str: @@ -70,6 +74,9 @@ def _chat_sync(self, system: str, user: str, json_schema: dict | None) -> str: {"role": "system", "content": system}, {"role": "user", "content": user}, ] + if self._provider in {"deepseek", "ollama"}: + completion = self._client.chat.completions.create(model=self._model, messages=messages) # type: ignore[arg-type] + return completion.choices[0].message.content or "{}" if json_schema: try: response = self._client.responses.create( # type: ignore[call-overload] @@ -96,24 +103,29 @@ def invoke_structured( tags: list[str] | None = None, metadata: dict[str, Any] | None = None, ) -> Any: - if self._chat_model is not None and ChatPromptTemplate is not None: + if self._supports_native_structured_output() and self._chat_model is not None and ChatPromptTemplate is not None: prompt = ChatPromptTemplate.from_messages( [ ("system", request.system_template), ("human", request.user_template), ] ) - schema = request.output_model or request.output_schema or {"type": "object", "properties": {}} - chain = prompt | self._chat_model.with_structured_output(schema) + structured_schema = request.output_model or request.output_schema or {"type": "object", "properties": {}} + chain = prompt | self._chat_model.with_structured_output(structured_schema) return chain.invoke(request.input_values, config=_runnable_config(run_name, tags, metadata)) # type: ignore[arg-type] - schema = request.output_schema # type: ignore[assignment] - if schema is None and request.output_model is not None: - schema = request.output_model.model_json_schema() + json_schema: dict[str, Any] | None = request.output_schema + if json_schema is None and request.output_model is not None: + json_schema = request.output_model.model_json_schema() + user_prompt = _render_user_prompt(request.user_template, request.input_values) + native_schema: dict[str, Any] | None = json_schema + if json_schema is not None and not self._supports_native_structured_output(): + user_prompt = _with_json_instructions(user_prompt, json_schema) + native_schema = None text = self._chat_sync( request.system_template, - _render_user_prompt(request.user_template, request.input_values), - schema, # type: ignore[arg-type] + user_prompt, + native_schema, ) return _parse_fallback(text, request.output_model) @@ -127,7 +139,7 @@ def invoke_parallel( ) -> dict[str, Any]: if not requests: return {} - if self._chat_model is not None and ChatPromptTemplate is not None and RunnableParallel is not None: + if self._supports_native_structured_output() and self._chat_model is not None and ChatPromptTemplate is not None and RunnableParallel is not None: sample_input = next(iter(requests.values())).input_values if all(req.input_values == sample_input for req in requests.values()): chains: dict[str, Any] = {} @@ -161,6 +173,9 @@ def invoke_parallel( results[key] = future.result() return results + def _supports_native_structured_output(self) -> bool: + return self._provider == "openai" + def _message_to_text(message: BaseMessage | str) -> str: if isinstance(message, str): @@ -188,11 +203,18 @@ def _render_user_prompt(template: str, values: dict[str, Any]) -> str: return template +def _with_json_instructions(user_prompt: str, json_schema: dict[str, Any]) -> str: + schema_text = json.dumps(json_schema, ensure_ascii=False) + return ( + f"{user_prompt}\n\n" + "Return only valid JSON matching this JSON Schema. " + "Do not include markdown fences or explanatory text.\n" + f"JSON Schema:\n{schema_text}" + ) + + def _parse_fallback(text: str, output_model: Any) -> Any: - try: - parsed: Any = json.loads(text) - except json.JSONDecodeError: - parsed = {} + parsed = _load_json_payload(text) if output_model is None: return parsed try: @@ -205,6 +227,25 @@ def _parse_fallback(text: str, output_model: Any) -> Any: raise +def _load_json_payload(text: str) -> Any: + decoder = json.JSONDecoder() + stripped = text.strip() + try: + return json.loads(stripped) + except json.JSONDecodeError: + pass + + for start_char in ("{", "["): + start = stripped.find(start_char) + while start != -1: + try: + parsed, _end = decoder.raw_decode(stripped[start:]) + return parsed + except json.JSONDecodeError: + start = stripped.find(start_char, start + 1) + return {} + + def _runnable_config( run_name: str | None, tags: list[str] | None, diff --git a/src/schemas/models.py b/src/schemas/models.py index c5e8d3f..89ba18a 100644 --- a/src/schemas/models.py +++ b/src/schemas/models.py @@ -244,6 +244,21 @@ class AnalysisFinding(BaseModel): evidence: list[SourceRef] = Field(default_factory=list) confidence: float = Field(default=0.0, ge=0.0, le=1.0) + @field_validator("evidence", mode="before") + @classmethod + def _drop_invalid_evidence(cls, value: Any) -> Any: + if value is None: + return [] + if not isinstance(value, list): + return value + filtered: list[Any] = [] + for item in value: + if isinstance(item, SourceRef): + filtered.append(item) + elif isinstance(item, dict) and item.get("page") is not None: + filtered.append(item) + return filtered + class DeepAnalysisResult(BaseModel): model_config = ConfigDict(extra="forbid") diff --git a/src/storage/task_store.py b/src/storage/task_store.py index 0e66663..b8e7f0c 100644 --- a/src/storage/task_store.py +++ b/src/storage/task_store.py @@ -85,6 +85,9 @@ def update_node(self, doc_id: str, node_name: str) -> dict[str, Any]: def get(self, doc_id: str) -> dict[str, Any] | None: with self._lock: + # Long-lived API connections can otherwise keep reading an old WAL + # snapshot after the worker updates the same tasks.db file. + self._conn.rollback() row = self._conn.execute( "SELECT doc_id, status, progress, current_node, error_message FROM tasks WHERE doc_id = ?", (doc_id,) ).fetchone() diff --git a/src/utils/document_metadata.py b/src/utils/document_metadata.py new file mode 100644 index 0000000..fc7cfd1 --- /dev/null +++ b/src/utils/document_metadata.py @@ -0,0 +1,107 @@ +from __future__ import annotations + +import re +from collections.abc import Mapping, Sequence +from datetime import date, datetime +from pathlib import Path +from typing import Any + +from src.schemas.models import DocumentMeta, Page + +_MONTH_DATE_RE = re.compile( + r"\b(January|February|March|April|May|June|July|August|September|October|November|December)\s+" + r"([0-3]?\d),\s*(20\d{2}|19\d{2})\b", + re.IGNORECASE, +) +_ISO_DATE_RE = re.compile(r"\b(20\d{2}|19\d{2})[-/](0?[1-9]|1[0-2])[-/]([0-3]?\d)\b") +_COMPANY_SUFFIX_RE = re.compile( + r"\b(Inc\.|Corporation|Corp\.|Company|Co\.|Limited|Ltd\.|Holdings|Group|PLC|S\.A\.)(?=$|[\s,])", + re.IGNORECASE, +) + + +def enrich_document_meta(meta: DocumentMeta, pages: Sequence[Page | Mapping[str, Any]] | None) -> DocumentMeta: + text = _pages_text(pages) + updates: dict[str, Any] = {} + if not meta.company: + company = _infer_company(text, meta.filename) + if company: + updates["company"] = company + if not meta.report_type: + report_type = _infer_report_type(text, meta.filename) + if report_type: + updates["report_type"] = report_type + if meta.period_end is None: + period_end = _infer_period_end(text, meta.filename) + if period_end: + updates["period_end"] = period_end + return meta.model_copy(update=updates) if updates else meta + + +def _pages_text(pages: Sequence[Page | Mapping[str, Any]] | None) -> str: + if not pages: + return "" + chunks: list[str] = [] + for page in pages[:3]: + if isinstance(page, Page): + chunks.append(page.text) + elif isinstance(page, Mapping): + text = page.get("text") + if isinstance(text, str): + chunks.append(text) + return "\n".join(chunks) + + +def _infer_company(text: str, filename: str) -> str | None: + for line in _nonempty_lines(text)[:30]: + cleaned = line.strip(" :\t") + if len(cleaned) > 80: + continue + if _COMPANY_SUFFIX_RE.search(cleaned): + return cleaned + name = Path(filename).stem.replace("_", " ").replace("-", " ") + name = re.sub( + r"\b(FY\d{2,4}|FY|Q[1-4]|\d{2,4}|annual|quarterly|report|financial|statements|consolidated)\b", + " ", + name, + flags=re.IGNORECASE, + ) + name = re.sub(r"\s+", " ", name).strip() + return name.title() if name else None + + +def _infer_report_type(text: str, filename: str) -> str | None: + upper = f"{text}\n{filename}".upper() + if "FORM 10-K" in upper or "ANNUAL REPORT" in upper: + return "Annual Report" + if "FORM 10-Q" in upper or "QUARTERLY REPORT" in upper: + return "Quarterly Report" + if "CONDENSED CONSOLIDATED" in upper and "STATEMENT" in upper: + return "Condensed Consolidated Financial Statements" + if "CONSOLIDATED" in upper and "FINANCIAL" in upper and "STATEMENT" in upper: + return "Consolidated Financial Statements" + if "FINANCIAL" in upper and "STATEMENT" in upper: + return "Financial Statements" + return None + + +def _infer_period_end(text: str, filename: str) -> date | None: + haystack = re.sub(r"\s+", " ", f"{text}\n{filename}") + month_match = _MONTH_DATE_RE.search(haystack) + if month_match: + try: + return datetime.strptime(month_match.group(0), "%B %d, %Y").date() + except ValueError: + return None + iso_match = _ISO_DATE_RE.search(haystack) + if iso_match: + year, month, day = iso_match.groups() + try: + return date(int(year), int(month), int(day)) + except ValueError: + return None + return None + + +def _nonempty_lines(text: str) -> list[str]: + return [line.strip() for line in text.splitlines() if line.strip()] \ No newline at end of file diff --git a/tests/test_openai_compatible_client.py b/tests/test_openai_compatible_client.py new file mode 100644 index 0000000..4d943af --- /dev/null +++ b/tests/test_openai_compatible_client.py @@ -0,0 +1,71 @@ +from __future__ import annotations + +from typing import Any + +from pydantic import BaseModel + +from src.llm.base import StructuredPromptRequest +from src.llm.openai_client import OpenAILLMClient + + +class _StructuredAnswer(BaseModel): + summary: str + + +class _FakeMessage: + content = '{"summary":"ok"}' + + +class _FakeChoice: + message = _FakeMessage() + + +class _FakeCompletion: + choices = [_FakeChoice()] + + +class _FakeCompletions: + def __init__(self) -> None: + self.calls: list[dict[str, Any]] = [] + + def create(self, **kwargs: Any) -> _FakeCompletion: + self.calls.append(kwargs) + return _FakeCompletion() + + +class _FakeChat: + def __init__(self) -> None: + self.completions = _FakeCompletions() + + +class _FakeResponses: + def create(self, **_kwargs: Any) -> None: + raise AssertionError("OpenAI-compatible fallback must not call responses.create") + + +class _FakeOpenAI: + def __init__(self) -> None: + self.chat = _FakeChat() + self.responses = _FakeResponses() + + +def test_deepseek_structured_output_uses_plain_chat_completion() -> None: + client = OpenAILLMClient(api_key="test", model="deepseek-v4-flash", provider="deepseek") + fake_client = _FakeOpenAI() + client._client = fake_client # type: ignore[assignment] + client._chat_model = None + + result = client.invoke_structured( + StructuredPromptRequest( + system_template="You answer JSON.", + user_template="Summarize {topic}.", + input_values={"topic": "finance"}, + output_model=_StructuredAnswer, + ) + ) + + assert result.summary == "ok" + assert len(fake_client.chat.completions.calls) == 1 + call = fake_client.chat.completions.calls[0] + assert "response_format" not in call + assert "JSON Schema" in call["messages"][1]["content"] \ No newline at end of file diff --git a/tests/test_routes_web.py b/tests/test_routes_web.py index 2161347..9b802a6 100644 --- a/tests/test_routes_web.py +++ b/tests/test_routes_web.py @@ -2,6 +2,7 @@ from __future__ import annotations +import json from pathlib import Path import pytest @@ -89,7 +90,62 @@ def test_list_pagination(client: TestClient, tmp_path: Path) -> None: assert len(data["items"]) == 2 -def test_upload_endpoint_creates_doc_layout(client: TestClient, tmp_path: Path) -> None: +def test_list_enriches_missing_metadata_from_pages(client: TestClient, tmp_path: Path) -> None: + base = tmp_path / "data" + _make_doc(base, "abc123") + meta_path = base / "abc123" / "meta.json" + meta_path.write_text( + json.dumps( + { + "doc_id": "abc123", + "filename": "FY24_Q4_Consolidated_Financial_Statements.pdf", + "company": None, + "period_end": None, + "report_type": None, + "language": None, + "created_at": "2026-05-21T00:00:00Z", + } + ), + encoding="utf-8", + ) + (base / "abc123" / "extracted" / "pages.json").write_text( + json.dumps( + [ + { + "page_number": 1, + "text": "Apple Inc.\nCONDENSED CONSOLIDATED STATEMENTS OF OPERATIONS\nSeptember 28,\n2024", + "images": [], + } + ] + ), + encoding="utf-8", + ) + + r = client.get("/v1/documents") + + assert r.status_code == 200 + meta = r.json()["data"]["items"][0]["meta"] + assert meta["company"] == "Apple Inc." + assert meta["report_type"] == "Condensed Consolidated Financial Statements" + assert meta["period_end"] == "2024-09-28" + saved = json.loads(meta_path.read_text(encoding="utf-8")) + assert saved["company"] == "Apple Inc." + + +def test_upload_endpoint_creates_doc_layout(client: TestClient, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: + import src.api.routes as routes_mod + + def fake_start_analysis(meta: object, pdf_path: Path, background_tasks: object) -> dict[str, object]: + return { + "doc_id": getattr(meta, "doc_id"), + "status": "running", + "progress": 5, + "current_node": None, + "error_message": None, + } + + monkeypatch.setattr(routes_mod, "_start_analysis", fake_start_analysis) + r = client.post( "/v1/documents", files={"file": ("demo.pdf", b"%PDF-1.4\n%fake upload\n%%EOF\n", "application/pdf")}, @@ -99,7 +155,7 @@ def test_upload_endpoint_creates_doc_layout(client: TestClient, tmp_path: Path) assert r.status_code == 200 body = r.json() assert body["ok"] is True - assert body["data"]["status"] == "queued" + assert body["data"]["status"] == "running" doc_id = body["data"]["doc_id"] doc_dir = tmp_path / "data" / doc_id @@ -186,6 +242,28 @@ def test_pdf_endpoint_streams_bytes(client: TestClient, tmp_path: Path) -> None: assert r.content.startswith(b"%PDF") +def test_pdf_page_image_endpoint_renders_with_pdfium(client: TestClient, tmp_path: Path) -> None: + pytest.importorskip("pypdfium2") + fitz = pytest.importorskip("fitz") + + base = tmp_path / "data" + _make_doc(base, "abc123") + raw_pdf = base / "abc123" / "raw.pdf" + doc = fitz.open() + page = doc.new_page(width=300, height=240) + page.insert_text((36, 72), "page one", fontsize=12) + doc.save(str(raw_pdf)) + doc.close() + + r = client.get("/v1/documents/abc123/pages/1/image") + + assert r.status_code == 200 + assert r.headers["content-type"] == "image/png" + assert r.headers["x-pdf-engine"] == "pdfium" + assert r.content.startswith(b"\x89PNG") + assert (base / "abc123" / "pages" / "pdfium_preview" / "page_0001.png").exists() + + def test_pdf_missing_returns_404(client: TestClient, tmp_path: Path) -> None: _make_doc(tmp_path / "data", "abc123", with_pdf=False) r = client.get("/v1/documents/abc123/pdf") diff --git a/tests/test_schemas.py b/tests/test_schemas.py index 0491d68..f2c0ac7 100644 --- a/tests/test_schemas.py +++ b/tests/test_schemas.py @@ -1,9 +1,39 @@ from __future__ import annotations -from src.schemas.models import SourceRef +from src.schemas.models import DeepAnalysisResult, SourceRef def test_source_ref_quote_trim(): ref = SourceRef(ref_type="page_text", page=1, table_id=None, quote="a" * 250, confidence=0.5) assert ref.quote is not None - assert len(ref.quote) == 200 + assert len(ref.quote) == 200 + + +def test_deep_analysis_drops_evidence_without_page(): + result = DeepAnalysisResult.model_validate( + { + "doc_id": "doc-1", + "provider": "deepseek", + "model": "deepseek-v4-flash", + "summary": "ok", + "findings": [ + { + "finding_id": "f1", + "category": "overview", + "title": "Finding", + "severity": "low", + "summary": "summary", + "evidence": [ + {"ref_type": "page_text", "page": None, "quote": "missing page", "confidence": 0.2}, + {"ref_type": "page_text", "page": 1, "quote": "valid page", "confidence": 0.8}, + ], + "confidence": 0.7, + } + ], + "limitations": [], + "invocations": [], + } + ) + + assert len(result.findings[0].evidence) == 1 + assert result.findings[0].evidence[0].page == 1 diff --git a/tests/test_storage.py b/tests/test_storage.py index 8179778..1c3a0bf 100644 --- a/tests/test_storage.py +++ b/tests/test_storage.py @@ -27,6 +27,20 @@ def test_update(self, tmp_path: Path): assert task["status"] == "running" assert task["progress"] == 50 + def test_get_sees_updates_from_another_store(self, tmp_path: Path): + api_store = TaskStore(str(tmp_path)) + worker_store = TaskStore(str(tmp_path)) + api_store.create("doc-1") + api_store.update("doc-1", status="running", progress=10) + + assert api_store.get("doc-1") is not None + worker_store.update("doc-1", status="succeeded", progress=100) + + task = api_store.get("doc-1") + assert task is not None + assert task["status"] == "succeeded" + assert task["progress"] == 100 + def test_update_error_message(self, tmp_path: Path): store = TaskStore(str(tmp_path)) store.create("doc-1") diff --git a/web/src/api/docs.ts b/web/src/api/docs.ts index be86a3f..13c3e23 100644 --- a/web/src/api/docs.ts +++ b/web/src/api/docs.ts @@ -5,6 +5,7 @@ import type { AgentRun, AnalysisFinding, DeepAnalysisResult, + ExtractedPage, ExtractedTable, FinancialStatements, KeyNote, @@ -192,6 +193,9 @@ export const docsApi = { tables(docId: string) { return unwrap(http.get(`/v1/documents/${docId}/tables`)).then(normalizeTables) }, + pages(docId: string) { + return unwrap(http.get(`/v1/documents/${docId}/pages`)) + }, deepAnalysis(docId: string) { return unwrap(http.get(`/v1/documents/${docId}/deep-analysis`)).then(normalizeDeepAnalysis) }, @@ -204,6 +208,11 @@ export const docsApi = { pdfBlob(docId: string) { return http.get(`/v1/documents/${docId}/pdf`, { responseType: 'blob' }).then((r) => r.data as Blob) }, + pageImageBlob(docId: string, page: number) { + return http + .get(`/v1/documents/${docId}/pages/${page}/image`, { responseType: 'blob' }) + .then((r) => r.data as Blob) + }, pdfUrl(docId: string) { // Keep the raw route available for explicit downloads or direct linking. return buildApiUrl(`/v1/documents/${docId}/pdf`) diff --git a/web/src/api/types.ts b/web/src/api/types.ts index 709201c..776886f 100644 --- a/web/src/api/types.ts +++ b/web/src/api/types.ts @@ -76,9 +76,18 @@ export interface ExtractedTable { table_id: string page: number title?: string | null + n_rows?: number | null + n_cols?: number | null + raw_markdown?: string | null cells: TableCell[] } +export interface ExtractedPage { + page_number: number + text: string + images?: string[] +} + export interface PdfOperationResult { doc_id: string revision_id: string diff --git a/web/src/components/PdfViewer.vue b/web/src/components/PdfViewer.vue index 1326af8..3d7262f 100644 --- a/web/src/components/PdfViewer.vue +++ b/web/src/components/PdfViewer.vue @@ -4,7 +4,9 @@ import { docsApi } from '@/api/docs' const props = defineProps<{ docId: string; page: number | null }>() -const objectUrl = ref('') +const imageUrl = ref('') +const pageNumber = ref(props.page || 1) +const totalPages = ref(null) const loading = ref(false) const error = ref('') const hasRequestedPreview = ref(false) @@ -17,21 +19,25 @@ function replaceObjectUrl(nextUrl: string) { URL.revokeObjectURL(currentObjectUrl) } currentObjectUrl = nextUrl || null - objectUrl.value = nextUrl + imageUrl.value = nextUrl } -async function loadPdf(docId: string) { +async function loadPreview(docId: string) { const token = ++requestToken hasRequestedPreview.value = true loading.value = true error.value = '' try { - const pdfBlob = await docsApi.pdfBlob(docId) + const [pages, imageBlob] = await Promise.all([ + docsApi.pages(docId).catch(() => []), + docsApi.pageImageBlob(docId, pageNumber.value), + ]) if (token !== requestToken) { return } - replaceObjectUrl(URL.createObjectURL(pdfBlob)) + totalPages.value = pages.length ? pages.length : totalPages.value + replaceObjectUrl(URL.createObjectURL(imageBlob)) } catch (e: any) { if (token !== requestToken) { return @@ -45,6 +51,14 @@ async function loadPdf(docId: string) { } } +function goToPage(nextPage: number) { + const maxPage = totalPages.value || Number.MAX_SAFE_INTEGER + pageNumber.value = Math.max(1, Math.min(nextPage, maxPage)) + if (hasRequestedPreview.value) { + void loadPreview(props.docId) + } +} + watch( () => props.docId, () => { @@ -53,6 +67,19 @@ watch( loading.value = false error.value = '' hasRequestedPreview.value = false + pageNumber.value = props.page || 1 + totalPages.value = null + }, +) + +watch( + () => props.page, + (page) => { + if (!page || page === pageNumber.value) return + pageNumber.value = page + if (hasRequestedPreview.value) { + void loadPreview(props.docId) + } }, ) @@ -60,21 +87,27 @@ onBeforeUnmount(() => { replaceObjectUrl('') }) -const src = computed(() => { - const base = objectUrl.value - if (!base) return '' - // PDF.js viewers in Chrome/Edge honor #page= anchors. - return props.page ? `${base}#page=${props.page}` : base -}) +const pageLabel = computed(() => (totalPages.value ? `第 ${pageNumber.value} / ${totalPages.value} 页` : `第 ${pageNumber.value} 页`))