-
Notifications
You must be signed in to change notification settings - Fork 35
β‘ Bolt: Offload blocking I/O to thread pool in async handlers #696
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,6 +5,7 @@ | |
| """ | ||
|
|
||
| from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form, Response | ||
| from fastapi.concurrency import run_in_threadpool | ||
| from sqlalchemy.orm import Session | ||
| from sqlalchemy import func, case | ||
| from typing import List, Optional | ||
|
|
@@ -281,7 +282,10 @@ async def upload_visit_images( | |
| Maximum 10 images per visit | ||
| """ | ||
| try: | ||
| visit = db.query(FieldOfficerVisit).filter(FieldOfficerVisit.id == visit_id).first() | ||
| # Performance Optimization: Wrap blocking DB query in threadpool | ||
| visit = await run_in_threadpool( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. P1: SQLAlchemy Session/ORM object Extract a single synchronous helper that performs the query, mutation, and commit atomically within one thread, and call it via Prompt for AI agents |
||
| lambda: db.query(FieldOfficerVisit).filter(FieldOfficerVisit.id == visit_id).first() | ||
| ) | ||
|
Comment on lines
+285
to
+288
|
||
|
|
||
| if not visit: | ||
| raise HTTPException(status_code=404, detail=f"Visit {visit_id} not found") | ||
|
|
@@ -337,8 +341,12 @@ async def upload_visit_images( | |
| file_path = os.path.join(VISIT_IMAGES_DIR, safe_filename) | ||
|
|
||
| # Save file | ||
| with open(file_path, 'wb') as f: | ||
| f.write(content) | ||
| # Performance Optimization: Wrap blocking File I/O in threadpool | ||
| def _save_image(p, c): | ||
| with open(p, 'wb') as f: | ||
| f.write(c) | ||
|
|
||
| await run_in_threadpool(_save_image, file_path, content) | ||
|
|
||
| # Store relative path | ||
| relative_path = os.path.join("data", "visit_images", safe_filename) | ||
|
|
@@ -349,7 +357,8 @@ async def upload_visit_images( | |
| visit.visit_images = existing_images | ||
| visit.updated_at = datetime.now(timezone.utc) | ||
|
|
||
| db.commit() | ||
| # Performance Optimization: Wrap blocking DB commit in threadpool | ||
| await run_in_threadpool(db.commit) | ||
|
Comment on lines
357
to
+361
|
||
|
|
||
| logger.info(f"Uploaded {len(images)} images for visit {visit_id}") | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -265,8 +265,10 @@ def _save_audio_file(): | |
| prev_hash = blockchain_last_hash_cache.get("last_hash") | ||
| if prev_hash is None: | ||
| # Cache miss: Fetch only the last hash from DB | ||
| # Use await run_in_threadpool for DB query if needed, or just do it in-thread | ||
| prev_issue = db.query(Issue.integrity_hash).order_by(Issue.id.desc()).first() | ||
| # Performance Optimization: Wrap blocking DB query in threadpool | ||
| prev_issue = await run_in_threadpool( | ||
| lambda: db.query(Issue.integrity_hash).order_by(Issue.id.desc()).first() | ||
| ) | ||
| prev_hash = prev_issue[0] if prev_issue and prev_issue[0] else "" | ||
| blockchain_last_hash_cache.set(data=prev_hash, key="last_hash") | ||
|
|
||
|
|
@@ -300,10 +302,8 @@ def _save_audio_file(): | |
| audio_file_path=relative_audio_path # Store relative path | ||
| ) | ||
|
|
||
| # Standard synchronous DB operations for simplicity and thread-safety | ||
| db.add(new_issue) | ||
| db.commit() | ||
| db.refresh(new_issue) | ||
| # Performance Optimization: Wrap blocking DB operations in threadpool to keep event loop responsive | ||
| await run_in_threadpool(save_issue_db, db, new_issue) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. P1: Do not pass the same SQLAlchemy Session instance into Prompt for AI agents |
||
|
|
||
| # Update cache for next report AFTER successful DB commit | ||
| blockchain_last_hash_cache.set(data=integrity_hash, key="last_hash") | ||
|
Comment on lines
+305
to
309
|
||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -1,6 +1,7 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import logging | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import json | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import os | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from fastapi.concurrency import run_in_threadpool | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from pywebpush import webpush, WebPushException | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from backend.database import SessionLocal | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from backend.models import Issue, PushSubscription | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -18,11 +19,14 @@ async def process_action_plan_background(issue_id: int, description: str, catego | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| action_plan = await generate_action_plan(description, category, language, image_path) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Update issue in DB | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| issue = db.query(Issue).filter(Issue.id == issue_id).first() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Performance Optimization: Wrap blocking DB operations in threadpool | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| issue = await run_in_threadpool( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. P1: The Extract all DB work (query + mutation + commit + close) into a single synchronous helper and invoke it once via Prompt for AI agents |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| lambda: db.query(Issue).filter(Issue.id == issue_id).first() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if issue: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| current_plan = issue.action_plan or {} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| issue.action_plan = {**current_plan, **action_plan} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| db.commit() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| await run_in_threadpool(db.commit) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+22
to
+29
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. π§© Analysis chainπ Script executed: #!/bin/bash
# Description: Find async functions that create/use SessionLocal and offload only individual DB operations.
rg -n -C4 'async def|SessionLocal\(|run_in_threadpool\(|db\.commit|db\.close' backend/tasks.pyRepository: RohanExploit/VishwaGuru Length of output: 2617 Move the entire SQLAlchemy update into one threadpool helper. The current code creates β»οΈ Proposed refactor+def _merge_action_plan_into_issue(issue_id: int, action_plan: dict) -> bool:
+ db = SessionLocal()
+ try:
+ issue = db.query(Issue).filter(Issue.id == issue_id).first()
+ if not issue:
+ return False
+
+ current_plan = issue.action_plan or {}
+ issue.action_plan = {**current_plan, **action_plan}
+ db.commit()
+ return True
+ finally:
+ db.close()
+
async def process_action_plan_background(issue_id: int, description: str, category: str, language: str, image_path: str):
- db = SessionLocal()
try:
# Generate Action Plan (AI)
action_plan = await generate_action_plan(description, category, language, image_path)
# Update issue in DB
- # Performance Optimization: Wrap blocking DB operations in threadpool
- issue = await run_in_threadpool(
- lambda: db.query(Issue).filter(Issue.id == issue_id).first()
- )
- if issue:
- current_plan = issue.action_plan or {}
- issue.action_plan = {**current_plan, **action_plan}
- await run_in_threadpool(db.commit)
+ updated = await run_in_threadpool(_merge_action_plan_into_issue, issue_id, action_plan)
+ if updated:
# Invalidate cache to ensure users get the updated action plan
recent_issues_cache.clear()
except Exception as e:
logger.error(f"Background action plan generation failed for issue {issue_id}: {e}", exc_info=True)
- finally:
- db.close()π Committable suggestion
Suggested change
π€ Prompt for AI Agents |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+22
to
30
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Invalidate cache to ensure users get the updated action plan | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| recent_issues_cache.clear() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -31,8 +35,13 @@ async def process_action_plan_background(issue_id: int, description: str, catego | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| finally: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| db.close() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| async def create_grievance_from_issue_background(issue_id: int): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """Background task to create a grievance from an issue for escalation management""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def create_grievance_from_issue_background(issue_id: int): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Background task to create a grievance from an issue for escalation management. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Performance Optimization: Changed to synchronous function since it only performs | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| blocking DB operations, allowing FastAPI to run it in a threadpool automatically | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| when added via BackgroundTasks. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| db = SessionLocal() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Get the issue | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use the actual PR date for this learning entry.
Line 85 is dated
2026-06-12, which is in the future relative to this PR/current date: April 22, 2026.π Proposed fix
π Committable suggestion
π€ Prompt for AI Agents