From 69ff2d5f13a24893a689564e7d8163ad7ee976a7 Mon Sep 17 00:00:00 2001 From: RohanExploit <178623867+RohanExploit@users.noreply.github.com> Date: Wed, 22 Apr 2026 14:08:46 +0000 Subject: [PATCH] =?UTF-8?q?=E2=9A=A1=20Bolt:=20Offload=20blocking=20I/O=20?= =?UTF-8?q?to=20thread=20pool=20in=20async=20handlers?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Wrapped synchronous DB and File I/O in `run_in_threadpool` for `voice` and `field_officer` routers. - Optimized background tasks by leveraging standard `def` for blocking-only logic. - Ensured event loop responsiveness during heavy submission paths. --- .jules/bolt.md | 4 ++++ backend/routers/field_officer.py | 17 +++++++++++++---- backend/routers/voice.py | 12 ++++++------ backend/tasks.py | 17 +++++++++++++---- 4 files changed, 36 insertions(+), 14 deletions(-) diff --git a/.jules/bolt.md b/.jules/bolt.md index ddd78ae2..04a616b9 100644 --- a/.jules/bolt.md +++ b/.jules/bolt.md @@ -81,3 +81,7 @@ ## 2026-05-15 - Serialization Caching Bypass **Learning:** Caching raw Python objects (like SQLAlchemy models or Pydantic instances) in a high-traffic API still incurs significant overhead because FastAPI/Pydantic must re-serialize the data on every request. **Action:** Serialize data to a JSON string using `json.dumps()` BEFORE caching. On cache hits, return a raw `fastapi.Response(content=..., media_type="application/json")`. This bypasses the validation and serialization layer, resulting in significant performance gains (up to 50x in benchmarks). + +## 2026-06-12 - Threadpool Offloading for Tail Latency +**Learning:** Mixed I/O operations (Database and File System) in FastAPI `async def` endpoints block the event loop, causing severe tail latency spikes under concurrency. Explicitly offloading these to `run_in_threadpool` is essential for maintaining responsiveness. +**Action:** Wrap all synchronous DB and File I/O operations in `run_in_threadpool`. For purely blocking background tasks, use standard `def` instead of `async def` to leverage FastAPI's automatic threadpool execution. diff --git a/backend/routers/field_officer.py b/backend/routers/field_officer.py index 8977d28a..cb0248b0 100644 --- a/backend/routers/field_officer.py +++ b/backend/routers/field_officer.py @@ -5,6 +5,7 @@ """ from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form, Response +from fastapi.concurrency import run_in_threadpool from sqlalchemy.orm import Session from sqlalchemy import func, case from typing import List, Optional @@ -281,7 +282,10 @@ async def upload_visit_images( Maximum 10 images per visit """ try: - visit = db.query(FieldOfficerVisit).filter(FieldOfficerVisit.id == visit_id).first() + # Performance Optimization: Wrap blocking DB query in threadpool + visit = await run_in_threadpool( + lambda: db.query(FieldOfficerVisit).filter(FieldOfficerVisit.id == visit_id).first() + ) if not visit: raise HTTPException(status_code=404, detail=f"Visit {visit_id} not found") @@ -337,8 +341,12 @@ async def upload_visit_images( file_path = os.path.join(VISIT_IMAGES_DIR, safe_filename) # Save file - with open(file_path, 'wb') as f: - f.write(content) + # Performance Optimization: Wrap blocking File I/O in threadpool + def _save_image(p, c): + with open(p, 'wb') as f: + f.write(c) + + await run_in_threadpool(_save_image, file_path, content) # Store relative path relative_path = os.path.join("data", "visit_images", safe_filename) @@ -349,7 +357,8 @@ async def upload_visit_images( visit.visit_images = existing_images visit.updated_at = datetime.now(timezone.utc) - db.commit() + # Performance Optimization: Wrap blocking DB commit in threadpool + await run_in_threadpool(db.commit) logger.info(f"Uploaded {len(images)} images for visit {visit_id}") diff --git a/backend/routers/voice.py b/backend/routers/voice.py index 264a0643..df056c42 100644 --- a/backend/routers/voice.py +++ b/backend/routers/voice.py @@ -265,8 +265,10 @@ def _save_audio_file(): prev_hash = blockchain_last_hash_cache.get("last_hash") if prev_hash is None: # Cache miss: Fetch only the last hash from DB - # Use await run_in_threadpool for DB query if needed, or just do it in-thread - prev_issue = db.query(Issue.integrity_hash).order_by(Issue.id.desc()).first() + # Performance Optimization: Wrap blocking DB query in threadpool + prev_issue = await run_in_threadpool( + lambda: db.query(Issue.integrity_hash).order_by(Issue.id.desc()).first() + ) prev_hash = prev_issue[0] if prev_issue and prev_issue[0] else "" blockchain_last_hash_cache.set(data=prev_hash, key="last_hash") @@ -300,10 +302,8 @@ def _save_audio_file(): audio_file_path=relative_audio_path # Store relative path ) - # Standard synchronous DB operations for simplicity and thread-safety - db.add(new_issue) - db.commit() - db.refresh(new_issue) + # Performance Optimization: Wrap blocking DB operations in threadpool to keep event loop responsive + await run_in_threadpool(save_issue_db, db, new_issue) # Update cache for next report AFTER successful DB commit blockchain_last_hash_cache.set(data=integrity_hash, key="last_hash") diff --git a/backend/tasks.py b/backend/tasks.py index 9b0b87ba..15e25690 100644 --- a/backend/tasks.py +++ b/backend/tasks.py @@ -1,6 +1,7 @@ import logging import json import os +from fastapi.concurrency import run_in_threadpool from pywebpush import webpush, WebPushException from backend.database import SessionLocal from backend.models import Issue, PushSubscription @@ -18,11 +19,14 @@ async def process_action_plan_background(issue_id: int, description: str, catego action_plan = await generate_action_plan(description, category, language, image_path) # Update issue in DB - issue = db.query(Issue).filter(Issue.id == issue_id).first() + # Performance Optimization: Wrap blocking DB operations in threadpool + issue = await run_in_threadpool( + lambda: db.query(Issue).filter(Issue.id == issue_id).first() + ) if issue: current_plan = issue.action_plan or {} issue.action_plan = {**current_plan, **action_plan} - db.commit() + await run_in_threadpool(db.commit) # Invalidate cache to ensure users get the updated action plan recent_issues_cache.clear() @@ -31,8 +35,13 @@ async def process_action_plan_background(issue_id: int, description: str, catego finally: db.close() -async def create_grievance_from_issue_background(issue_id: int): - """Background task to create a grievance from an issue for escalation management""" +def create_grievance_from_issue_background(issue_id: int): + """ + Background task to create a grievance from an issue for escalation management. + Performance Optimization: Changed to synchronous function since it only performs + blocking DB operations, allowing FastAPI to run it in a threadpool automatically + when added via BackgroundTasks. + """ db = SessionLocal() try: # Get the issue