From bc9bd5c27982a56e75301a9d40a7741de1a0bb90 Mon Sep 17 00:00:00 2001 From: Eugene Shershen Date: Sun, 10 May 2026 17:20:57 +0300 Subject: [PATCH 1/4] added concurrent operations are not permitted --- .github/workflows/ci.yml | 36 ++++++++ tests/test_concurrent_queries_postgres.py | 108 ++++++++++++++++++++++ 2 files changed, 144 insertions(+) create mode 100644 tests/test_concurrent_queries_postgres.py diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f87750d..fe26f17 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -72,6 +72,42 @@ jobs: file: ./coverage.xml token: ${{ secrets.CODECOV_TOKEN }} + test-postgres: + name: test-postgres + runs-on: ubuntu-latest + services: + postgres: + image: postgres:16 + env: + POSTGRES_USER: test + POSTGRES_PASSWORD: test + POSTGRES_DB: test + ports: + - 5432:5432 + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 5 + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.12' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip wheel + pip install -r requirements.txt + + - name: Run PostgreSQL concurrent-queries tests + env: + POSTGRES_TEST_URL: postgresql+asyncpg://test:test@localhost:5432/test + run: pytest tests/test_concurrent_queries_postgres.py -v + ruff: name: ruff runs-on: ubuntu-latest diff --git a/tests/test_concurrent_queries_postgres.py b/tests/test_concurrent_queries_postgres.py new file mode 100644 index 0000000..c640dca --- /dev/null +++ b/tests/test_concurrent_queries_postgres.py @@ -0,0 +1,108 @@ +"""Real PostgreSQL/asyncpg reproduction of the `isce` concurrent-operations error. + +The production traceback originates here:: + + sqlalchemy.exc.InvalidRequestError: This session is provisioning a new + connection; concurrent operations are not permitted + (https://sqlalche.me/e/20/isce) + +The error is triggered by ``await asyncio.gather(session.execute(...), +session.execute(...))`` against a single AsyncSession backed by asyncpg. +SQLite/aiosqlite serialises internally and does NOT reliably reproduce it +(see ``test_concurrent_queries.py``), so this module targets a real Postgres +instance and is skipped unless ``POSTGRES_TEST_URL`` is set. + +Run locally:: + + POSTGRES_TEST_URL="postgresql+asyncpg://user:pass@localhost:5432/test" \\ + pytest tests/test_concurrent_queries_postgres.py -v +""" + +import asyncio +import os +import uuid + +import pytest +from sqlalchemy import text + +POSTGRES_URL = os.getenv("POSTGRES_TEST_URL") + +pytestmark = pytest.mark.skipif( + not POSTGRES_URL, + reason="POSTGRES_TEST_URL not set; this test requires a real PostgreSQL/asyncpg instance", +) + + +@pytest.fixture +def table_name(): + return f"isce_repro_{uuid.uuid4().hex[:8]}" + + +@pytest.fixture +async def setup_table(app, db, SQLAlchemyMiddleware, table_name): + SQLAlchemyMiddleware(app, db_url=POSTGRES_URL) + async with db(commit_on_exit=True): + await db.session.execute( + text(f"CREATE TABLE {table_name} (id SERIAL PRIMARY KEY, name TEXT NOT NULL)") + ) + await db.session.execute( + text(f"INSERT INTO {table_name} (name) SELECT 'row_' || g FROM generate_series(1, 50) g") + ) + try: + yield + finally: + async with db(commit_on_exit=True): + await db.session.execute(text(f"DROP TABLE IF EXISTS {table_name}")) + + +@pytest.mark.asyncio +async def test_gather_on_same_session_raises_isce(db, table_name, setup_table): + """asyncio.gather() on a single AsyncSession must raise isce on asyncpg.""" + count_stmt = text(f"SELECT COUNT(*) FROM {table_name}") + rows_stmt = text(f"SELECT id, name FROM {table_name} LIMIT 10") + + with pytest.raises(Exception) as exc_info: + async with db(): + await asyncio.gather( + db.session.execute(count_stmt), + db.session.execute(rows_stmt), + ) + + error_msg = str(exc_info.value).lower() + assert ( + "concurrent operations are not permitted" in error_msg + or "provisioning a new connection" in error_msg + or "isce" in error_msg + ), f"Expected isce error, got: {exc_info.value!r}" + + +@pytest.mark.asyncio +async def test_db_gather_multi_sessions_avoids_isce(db, table_name, setup_table): + """db.gather() in multi_sessions mode gives each task its own session β€” no isce.""" + count_stmt = text(f"SELECT COUNT(*) FROM {table_name}") + rows_stmt = text(f"SELECT id, name FROM {table_name} LIMIT 10") + + async def get_count(): + result = await db.session.execute(count_stmt) + return result.scalar() + + async def get_rows(): + result = await db.session.execute(rows_stmt) + return result.fetchall() + + async with db(multi_sessions=True, max_concurrent=2): + count, rows = await db.gather(get_count(), get_rows()) + + assert count == 50 + assert len(rows) == 10 + + +@pytest.mark.asyncio +async def test_sequential_execute_on_same_session_works(db, table_name, setup_table): + """Sequential awaits on the same session never trigger isce.""" + async with db(): + count_result = await db.session.execute(text(f"SELECT COUNT(*) FROM {table_name}")) + rows_result = await db.session.execute(text(f"SELECT id FROM {table_name} LIMIT 5")) + + assert count_result.scalar() == 50 + assert len(rows_result.fetchall()) == 5 From 17c1f45b0ebacf3810c27b017f8c2633312e76e1 Mon Sep 17 00:00:00 2001 From: Eugene Shershen Date: Sun, 14 Jun 2026 21:43:35 +0300 Subject: [PATCH 2/4] fix lint --- .gitignore | 1 + tests/test_concurrent_queries_postgres.py | 5 ++++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 4409b7a..7537e16 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,4 @@ CLAUDE.md .DS_Store .kiro/ test.db +openspec/ diff --git a/tests/test_concurrent_queries_postgres.py b/tests/test_concurrent_queries_postgres.py index c640dca..b4c9ed3 100644 --- a/tests/test_concurrent_queries_postgres.py +++ b/tests/test_concurrent_queries_postgres.py @@ -46,7 +46,10 @@ async def setup_table(app, db, SQLAlchemyMiddleware, table_name): text(f"CREATE TABLE {table_name} (id SERIAL PRIMARY KEY, name TEXT NOT NULL)") ) await db.session.execute( - text(f"INSERT INTO {table_name} (name) SELECT 'row_' || g FROM generate_series(1, 50) g") + text( + f"INSERT INTO {table_name} (name) " + "SELECT 'row_' || g FROM generate_series(1, 50) g" + ) ) try: yield From abf657473cc4c7deba7e5c4b2cbe48700dc1043b Mon Sep 17 00:00:00 2001 From: Eugene Shershen Date: Sun, 14 Jun 2026 23:31:23 +0300 Subject: [PATCH 3/4] docs --- .github/workflows/docs.yml | 60 ++++++++++ .gitignore | 1 + README.md | 3 + docs/api-reference.md | 202 +++++++++++++++++++++++++++++++++ docs/assets/extra.css | 138 ++++++++++++++++++++++ docs/assets/favicon.svg | 8 ++ docs/faq.md | 193 +++++++++++++++++++++++++++++++ docs/getting-started.md | 117 +++++++++++++++++++ docs/guide/concurrency.md | 127 +++++++++++++++++++++ docs/guide/engine-lifecycle.md | 101 +++++++++++++++++ docs/guide/events.md | 85 ++++++++++++++ docs/guide/index.md | 87 ++++++++++++++ docs/guide/multi-database.md | 102 +++++++++++++++++ docs/guide/sessions.md | 137 ++++++++++++++++++++++ docs/guide/streaming.md | 102 +++++++++++++++++ docs/guide/type-hints.md | 91 +++++++++++++++ docs/index.md | 126 ++++++++++++++++++++ mkdocs.yml | 124 ++++++++++++++++++++ requirements-docs.txt | 2 + 19 files changed, 1806 insertions(+) create mode 100644 .github/workflows/docs.yml create mode 100644 docs/api-reference.md create mode 100644 docs/assets/extra.css create mode 100644 docs/assets/favicon.svg create mode 100644 docs/faq.md create mode 100644 docs/getting-started.md create mode 100644 docs/guide/concurrency.md create mode 100644 docs/guide/engine-lifecycle.md create mode 100644 docs/guide/events.md create mode 100644 docs/guide/index.md create mode 100644 docs/guide/multi-database.md create mode 100644 docs/guide/sessions.md create mode 100644 docs/guide/streaming.md create mode 100644 docs/guide/type-hints.md create mode 100644 docs/index.md create mode 100644 mkdocs.yml create mode 100644 requirements-docs.txt diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml new file mode 100644 index 0000000..e896e3d --- /dev/null +++ b/.github/workflows/docs.yml @@ -0,0 +1,60 @@ +name: docs + +on: + push: + branches: + - main + paths: + - "docs/**" + - "mkdocs.yml" + - "requirements-docs.txt" + - ".github/workflows/docs.yml" + workflow_dispatch: + +# Allow the GITHUB_TOKEN to deploy to GitHub Pages. +permissions: + contents: read + pages: write + id-token: write + +# Allow one concurrent deployment; don't cancel an in-progress production deploy. +concurrency: + group: pages + cancel-in-progress: false + +jobs: + build: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + with: + fetch-depth: 0 # so `git-revision-date` style plugins can read history + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.12" + cache: pip + + - name: Install documentation dependencies + run: pip install -r requirements-docs.txt + + - name: Build site (strict) + run: mkdocs build --strict + + - name: Upload Pages artifact + uses: actions/upload-pages-artifact@v3 + with: + path: site + + deploy: + needs: build + runs-on: ubuntu-latest + environment: + name: github-pages + url: ${{ steps.deployment.outputs.page_url }} + steps: + - name: Deploy to GitHub Pages + id: deployment + uses: actions/deploy-pages@v4 diff --git a/.gitignore b/.gitignore index 7537e16..358d694 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ dist/ build/ +site/ .vscode/ venv/ .idea/ diff --git a/README.md b/README.md index cb3f4f4..829a1f1 100644 --- a/README.md +++ b/README.md @@ -7,11 +7,14 @@ [![pip](https://img.shields.io/pypi/v/fastapi_async_sqlalchemy?color=blue)](https://pypi.org/project/fastapi-async-sqlalchemy/) [![Downloads](https://static.pepy.tech/badge/fastapi-async-sqlalchemy)](https://pepy.tech/project/fastapi-async-sqlalchemy) [![Updates](https://pyup.io/repos/github/h0rn3t/fastapi-async-sqlalchemy/shield.svg)](https://pyup.io/repos/github/h0rn3t/fastapi-async-sqlalchemy/) +[![Docs](https://img.shields.io/badge/docs-mkdocs--material-0e8a8a)](https://h0rn3t.github.io/fastapi-async-sqlalchemy/) ### Description Provides SQLAlchemy middleware for FastAPI using AsyncSession and async engine. +πŸ“– **Full documentation: ** + ### Install ```bash diff --git a/docs/api-reference.md b/docs/api-reference.md new file mode 100644 index 0000000..98e51b6 --- /dev/null +++ b/docs/api-reference.md @@ -0,0 +1,202 @@ +# API Reference + +Everything the package exports from `fastapi_async_sqlalchemy`. + +```python +from fastapi_async_sqlalchemy import ( + SQLAlchemyMiddleware, + db, + create_middleware_and_session_proxy, + DBSessionMeta, + DBSessionType, +) +``` + +--- + +## `SQLAlchemyMiddleware` + +ASGI middleware that opens a request-scoped `AsyncSession` and finalizes it when +the response is sent. Add it with `app.add_middleware(...)`. + +```python +app.add_middleware( + SQLAlchemyMiddleware, + db_url=None, + custom_engine=None, + engine_args=None, + session_args=None, + commit_on_exit=False, +) +``` + +**Parameters** + +| Name | Type | Default | Description | +| ---------------- | ------------------------ | ------- | ------------------------------------------------------------------------------------------- | +| `db_url` | `str \| URL \| None` | `None` | Database URL. The middleware creates and **owns** the engine. Mutually exclusive with `custom_engine`. | +| `custom_engine` | `AsyncEngine \| None` | `None` | A pre-built engine you **own**. The middleware uses it but never disposes it. | +| `engine_args` | `dict \| None` | `None` | Forwarded to `create_async_engine` (only when `db_url` is used). E.g. `pool_size`, `echo`. | +| `session_args` | `dict \| None` | `None` | Forwarded to `async_sessionmaker`. | +| `commit_on_exit` | `bool` | `False` | Commit the request session on a clean exit. See [Sessions](guide/sessions.md#commit-on-exit). | + +!!! warning "Exactly one engine source" + Passing neither `db_url` nor `custom_engine` raises `ValueError`. Binding a + proxy that is already bound to a different live engine raises `RuntimeError` + β€” use [`create_middleware_and_session_proxy()`](#create_middleware_and_session_proxy) + for additional databases. + +### `await middleware.dispose()` + +Dispose the middleware-owned engine. No-op for a `custom_engine`. Idempotent on +success and safe to retry on failure. Call it manually when running outside an +ASGI lifespan. See [Engine Lifecycle](guide/engine-lifecycle.md#manual-disposal-outside-a-lifespan). + +--- + +## `db` + +The global session proxy. Its public API lives on its metaclass; annotate it +with [`DBSessionMeta`](#dbsessionmeta-dbsessiontype). + +### `db.session` + +: **Property** β†’ `AsyncSession` + + The session bound to the current async context. + + - In `multi_sessions` mode, returns the calling task's own session. + - Raises [`SessionNotInitialisedError`](#exceptions) if no middleware has + been constructed yet. + - Raises [`MissingSessionError`](#exceptions) if there is no active request + or `async with db()` context. + +### `db(...)` + +: **Call** β†’ async context manager + + Open an explicit session context. + + ```python + db( + session_args=None, + commit_on_exit=False, + multi_sessions=False, + max_concurrent=None, + ) + ``` + + | Name | Type | Default | Description | + | ---------------- | ------------------- | ------- | ----------------------------------------------------------------- | + | `session_args` | `dict \| None` | `None` | Per-context overrides for the sessionmaker. | + | `commit_on_exit` | `bool` | `False` | Commit on a clean exit of the block. | + | `multi_sessions` | `bool` | `False` | Give each task its own session. See [Concurrency](guide/concurrency.md). | + | `max_concurrent` | `int \| None` | `None` | Cap simultaneous sessions. Must be `>= 1`, else `ValueError`. | + + ```python + async with db(commit_on_exit=True): + db.session.add(obj) + ``` + +### `db.connection()` + +: **Method** β†’ async context manager yielding `AsyncSession` + + Throttled session access for `multi_sessions` mode. Waits for a semaphore + slot (when `max_concurrent` is set) before creating a session, then releases + it on exit. Without `max_concurrent` it simply creates and cleans up a + session. + + ```python + async with db(multi_sessions=True, max_concurrent=10): + async with db.connection() as session: + await session.execute(text("SELECT 1")) + ``` + +### `await db.gather(*coros, return_exceptions=False)` + +: **Coroutine** β†’ `list` + + Pool-aware drop-in for `asyncio.gather`. Each coroutine acquires a slot (and + a session) before running and releases it afterwards, so no more than + `max_concurrent` connections are in use at once. + + - Accepts **coroutine objects only** when `max_concurrent` is set; + pre-created `Task`/`Future` inputs raise `TypeError`. + - With no `max_concurrent`, delegates directly to `asyncio.gather`. + + ```python + async with db(multi_sessions=True, max_concurrent=10): + results = await db.gather(*(work(i) for i in range(100))) + ``` + +--- + +## `create_middleware_and_session_proxy()` + +```python +create_middleware_and_session_proxy() -> tuple[type, DBSessionMeta] +``` + +Build a fresh, fully isolated `(middleware_class, db_proxy)` pair with its own +`ContextVar` state and engine binding. Use one pair per independent app or +database. + +```python +FirstMiddleware, first_db = create_middleware_and_session_proxy() +SecondMiddleware, second_db = create_middleware_and_session_proxy() +``` + +The package's default exports are created exactly this way: + +```python +SQLAlchemyMiddleware, db = create_middleware_and_session_proxy() +``` + +See [Multiple Databases](guide/multi-database.md). + +--- + +## `DBSessionMeta` / `DBSessionType` + +A structural `Protocol` (at type-check time) and the runtime metaclass of `db` +(at runtime). Use as an annotation for the `db` proxy. `DBSessionType` is an +alias of `DBSessionMeta`. See [Type Hints](guide/type-hints.md). + +```python +def get_db() -> DBSessionMeta: + return db +``` + +--- + +## Exceptions + +Both live in `fastapi_async_sqlalchemy` (and `fastapi_async_sqlalchemy.exceptions`). + +### `MissingSessionError` + +Raised when `db.session` is accessed with **no active session context** β€” you're +not in a request and haven't opened `async with db()`. + +```python +async with db(): + await db.session.execute(foo.select()) # βœ… fix +``` + +### `SessionNotInitialisedError` + +Raised when `db.session` is accessed before any `SQLAlchemyMiddleware` has been +**constructed**, so the sessionmaker doesn't exist yet. Ensure the middleware is +added during app setup. + +--- + +## Version + +The installed version is available as: + +```python +import fastapi_async_sqlalchemy +fastapi_async_sqlalchemy.__version__ +``` diff --git a/docs/assets/extra.css b/docs/assets/extra.css new file mode 100644 index 0000000..cac359f --- /dev/null +++ b/docs/assets/extra.css @@ -0,0 +1,138 @@ +/* ========================================================================== + FastAPI Async SQLAlchemy β€” custom theme layer on top of Material + ========================================================================== */ + +:root { + --fas-teal: #0e8a8a; + --fas-teal-bright: #12b3b3; + --fas-ink: #0b2027; +} + +/* Brand color tuning for the teal primary palette */ +[data-md-color-primary="teal"] { + --md-primary-fg-color: var(--fas-teal); + --md-primary-fg-color--light: var(--fas-teal-bright); + --md-primary-fg-color--dark: #0a6b6b; +} + +/* -------------------------------------------------------------------------- + Hero banner (home page) + -------------------------------------------------------------------------- */ +.fas-hero { + margin: -1.2rem -0.8rem 2.5rem; + padding: 4rem 1.5rem 3.5rem; + text-align: center; + color: #fff; + background: + radial-gradient(1200px 400px at 50% -10%, rgba(255, 255, 255, 0.18), transparent 60%), + linear-gradient(135deg, #0a6b6b 0%, #0e8a8a 45%, #12b3b3 100%); + border-radius: 0 0 24px 24px; + box-shadow: inset 0 -1px 0 rgba(0, 0, 0, 0.08); +} + +.fas-hero h1 { + font-size: 2.7rem; + font-weight: 800; + line-height: 1.1; + margin: 0 0 0.6rem; + color: #fff; + letter-spacing: -0.02em; +} + +.fas-hero p { + max-width: 40rem; + margin: 0 auto 1.6rem; + font-size: 1.05rem; + line-height: 1.6; + color: rgba(255, 255, 255, 0.92); +} + +.fas-hero .fas-cta { + display: inline-flex; + flex-wrap: wrap; + gap: 0.7rem; + justify-content: center; +} + +.fas-hero .md-button { + border-radius: 999px; + font-weight: 600; +} + +.fas-hero .md-button--primary { + background: #fff; + color: var(--fas-teal) !important; + border-color: #fff; +} + +.fas-hero .md-button:not(.md-button--primary) { + color: #fff; + border-color: rgba(255, 255, 255, 0.7); +} + +.fas-hero .md-button:not(.md-button--primary):hover { + background: rgba(255, 255, 255, 0.14); + border-color: #fff; +} + +.fas-badges { + margin-top: 1.6rem; + display: flex; + gap: 0.4rem; + flex-wrap: wrap; + justify-content: center; +} + +.fas-badges img { + height: 20px; +} + +/* -------------------------------------------------------------------------- + Feature cards (Material "grid cards" enhancement) + -------------------------------------------------------------------------- */ +.md-typeset .grid.cards > ul > li, +.md-typeset .grid.cards > :is(ul, ol) > li { + border-radius: 14px; + border: 1px solid var(--md-default-fg-color--lightest); + transition: border-color 160ms ease, box-shadow 160ms ease, transform 160ms ease; +} + +.md-typeset .grid.cards > ul > li:hover { + border-color: var(--fas-teal-bright); + box-shadow: 0 10px 28px -16px rgba(14, 138, 138, 0.55); + transform: translateY(-2px); +} + +.md-typeset .grid.cards .twemoji, +.md-typeset .grid.cards .lg { + color: var(--fas-teal); +} + +/* -------------------------------------------------------------------------- + Code & inline polish + -------------------------------------------------------------------------- */ +.md-typeset :not(pre) > code { + border-radius: 5px; +} + +.md-typeset .admonition, +.md-typeset details { + border-radius: 10px; +} + +/* Tables breathe a little more */ +.md-typeset table:not([class]) { + border-radius: 10px; + overflow: hidden; +} + +/* Footer / nav tab accent */ +.md-tabs { + box-shadow: inset 0 -1px 0 rgba(255, 255, 255, 0.12); +} + +@media screen and (max-width: 44.9em) { + .fas-hero h1 { + font-size: 2rem; + } +} diff --git a/docs/assets/favicon.svg b/docs/assets/favicon.svg new file mode 100644 index 0000000..aba2d08 --- /dev/null +++ b/docs/assets/favicon.svg @@ -0,0 +1,8 @@ + + + + + + + + diff --git a/docs/faq.md b/docs/faq.md new file mode 100644 index 0000000..8ed5b76 --- /dev/null +++ b/docs/faq.md @@ -0,0 +1,193 @@ +# FAQ & Troubleshooting + +Common errors, what they mean, and how to fix them. + +## `MissingSessionError` + +> No session found! Either you are not currently in a request context, or you +> need to manually create a session context… + +You accessed `db.session` outside any session context. This happens in startup +hooks, scripts, background tasks, or tests. + +**Fix** β€” wrap the access: + +```python +async with db(): + result = await db.session.execute(foo.select()) +``` + +See [Sessions & Contexts](guide/sessions.md#outside-a-request-async-with-db). + +--- + +## `SessionNotInitialisedError` + +> Session not initialised! Ensure that DBSessionMiddleware has been initialised… + +You accessed `db.session` before any `SQLAlchemyMiddleware` was constructed, so +the sessionmaker doesn't exist yet. + +**Fix** β€” make sure the middleware is added during app setup, before the access +runs: + +```python +app.add_middleware(SQLAlchemyMiddleware, db_url="...") +``` + +In tests, construct the middleware (or run the app lifespan) before touching +`db.session`. + +--- + +## `TimeoutError: QueuePool limit ... reached` + +You launched more concurrent sessions than your pool can serve. + +**Fix** β€” throttle with `max_concurrent` and route child sessions through +`db.gather()` or `db.connection()`: + +```python +async with db(multi_sessions=True, max_concurrent=10): + results = await db.gather(*(work(i) for i in range(1000))) +``` + +See [Concurrent Queries](guide/concurrency.md#throttling-with-max_concurrent). + +--- + +## `InvalidRequestError: ... concurrent operations are not permitted` + +Two coroutines used the **same** `AsyncSession` at once (e.g. `asyncio.gather` +over `db.session`). A session can only do one thing at a time. + +**Fix** β€” give each task its own session with `multi_sessions=True`: + +```python +async with db(multi_sessions=True): + async def worker(n): + return await db.session.execute(text(f"SELECT {n}")) + await asyncio.gather(*(worker(i) for i in range(5))) +``` + +--- + +## `TypeError` from `db.gather()` + +> When `max_concurrent` is set, db.gather() accepts coroutine objects only… + +You passed a `Task` or `Future` to `db.gather()` while `max_concurrent` is set. +Those may already be running outside the semaphore. + +**Fix** β€” pass **coroutine objects**, not tasks: + +```python +# ❌ await db.gather(*[asyncio.create_task(work(i)) for i in range(10)]) +# βœ… +await db.gather(*(work(i) for i in range(10))) +``` + +Or manage your own tasks with `db.connection()` inside each. + +--- + +## `RuntimeError: ... child tasks must access DB via db.connection() or db.gather()` + +With `max_concurrent` set, a **child task** accessed `db.session` directly. That +path isn't throttled. + +**Fix** β€” open the session through `db.connection()` (or use `db.gather()`): + +```python +async with db.connection() as session: + await session.execute(text("SELECT 1")) +``` + +The **parent** task may still use `db.session` directly. + +--- + +## `RuntimeError: ... commit_on_exit=True cannot use the ... session with a streaming response` + +You returned a `StreamingResponse` while `commit_on_exit=True` and the request +session had already been used. + +**Fix** β€” own the database lifetime inside the generator, or commit before +streaming. See [Streaming Responses](guide/streaming.md). + +```python +async def body(): + async with db(): + result = await db.session.stream(foo.select()) + async for row in result: + yield serialize(row) +``` + +--- + +## `RuntimeError: This SQLAlchemy session proxy is already bound to another live engine` + +You added two middlewares built from the **same** proxy with different engines, +or reused the default pair for a second database. + +**Fix** β€” create an independent pair per database: + +```python +from fastapi_async_sqlalchemy import create_middleware_and_session_proxy + +SecondMiddleware, second_db = create_middleware_and_session_proxy() +``` + +See [Multiple Databases](guide/multi-database.md#one-proxy-one-live-engine). + +--- + +## Connections aren't released on shutdown + +If you constructed `SQLAlchemyMiddleware(db_url=...)` outside an ASGI lifespan +(a script or ad-hoc harness), nothing triggers disposal. + +**Fix** β€” dispose manually: + +```python +middleware = SQLAlchemyMiddleware(app, db_url="...") +try: + ... +finally: + await middleware.dispose() +``` + +For a `custom_engine`, **you** own disposal: `await engine.dispose()`. See +[Engine Lifecycle](guide/engine-lifecycle.md). + +--- + +## Graceful shutdown hangs + +Engine disposal runs before the lifespan shutdown ack is forwarded, so draining +a stuck pool blocks shutdown. + +**Fix** β€” set your ASGI server's graceful-shutdown timeout to cover the +worst-case connection close time, e.g. uvicorn: + +```bash +uvicorn main:app --timeout-graceful-shutdown 30 +``` + +--- + +## Does it work with SQLModel? + +Yes. If `sqlmodel` is installed, the middleware uses its `AsyncSession` subclass +automatically β€” no configuration needed. + +--- + +## Can I run the docs locally? + +```bash +pip install -r requirements-docs.txt +mkdocs serve +``` + +Then open . diff --git a/docs/getting-started.md b/docs/getting-started.md new file mode 100644 index 0000000..f2cdff9 --- /dev/null +++ b/docs/getting-started.md @@ -0,0 +1,117 @@ +# Getting Started + +This page takes you from an empty project to a running FastAPI app backed by an +async SQLAlchemy session in a few minutes. + +## Requirements + +- **Python** 3.12 or newer +- **starlette** β‰₯ 0.40 and **SQLAlchemy** β‰₯ 2.0 (installed automatically) +- An **async database driver** for your engine + +| Database | Driver | Example URL | +| ---------- | ------------ | ------------------------------------------------------- | +| PostgreSQL | `asyncpg` | `postgresql+asyncpg://user:pass@localhost:5432/app` | +| MySQL | `aiomysql` | `mysql+aiomysql://user:pass@localhost:3306/app` | +| SQLite | `aiosqlite` | `sqlite+aiosqlite:///./app.db` | + +## Install + +```bash +pip install fastapi-async-sqlalchemy + +# plus a driver, e.g. PostgreSQL +pip install asyncpg +``` + +It also works out of the box with [`sqlmodel`](https://sqlmodel.tiangolo.com/) β€” +if `sqlmodel` is installed, its `AsyncSession` subclass is used automatically. + +## Your first app + +```python title="main.py" +from fastapi import FastAPI +from fastapi_async_sqlalchemy import SQLAlchemyMiddleware, db +from sqlalchemy import column, table + +app = FastAPI() + +app.add_middleware( + SQLAlchemyMiddleware, + db_url="sqlite+aiosqlite:///./app.db", +) + +# a lightweight table reference for the example +files = table("ms_files", column("id")) + + +@app.get("/files") +async def list_files(): + result = await db.session.execute(files.select()) + return result.fetchall() +``` + +Run it: + +```bash +uvicorn main:app --reload +``` + +Open . The middleware opened a session for the +request, made it available as `db.session`, and closed it when the response was +sent. + +!!! tip "That's the whole idea" + You never created or passed a session. `db.session` is bound to the current + request via a `ContextVar`, so it resolves correctly even from helper + functions called deep inside the request. + +## Configuring the engine + +Pass engine and session options through `engine_args` / `session_args`. These +are forwarded verbatim to SQLAlchemy's +[`create_async_engine`](https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html) +and `async_sessionmaker`. + +```python +app.add_middleware( + SQLAlchemyMiddleware, + db_url="postgresql+asyncpg://user:pass@localhost:5432/app", + engine_args={ + "echo": True, # log every SQL statement + "pool_pre_ping": True, # validate connections before use + "pool_size": 5, # connections kept open + "max_overflow": 10, # extra connections allowed above pool_size + }, + commit_on_exit=True, # commit the request session on a clean exit +) +``` + +## Using the session outside a request + +Outside the request/response cycle (startup hooks, scripts, workers) there is no +middleware to open a session for you. Open one explicitly with the `db` context +manager: + +```python +async def warm_cache(): + async with db(): + result = await db.session.execute(files.select()) + return result.fetchall() + + +@app.on_event("startup") +async def on_startup(): + await warm_cache() +``` + +## Next steps + +
+ +- :material-database-sync: [__Sessions & contexts__](guide/sessions.md) β€” how `db.session` and `async with db()` behave +- :material-engine: [__Engine lifecycle__](guide/engine-lifecycle.md) β€” ownership, disposal, graceful shutdown +- :material-arrow-decision: [__Concurrent queries__](guide/concurrency.md) β€” run parallel queries safely +- :material-book-open-variant: [__API reference__](api-reference.md) β€” every public symbol + +
diff --git a/docs/guide/concurrency.md b/docs/guide/concurrency.md new file mode 100644 index 0000000..4bf7c97 --- /dev/null +++ b/docs/guide/concurrency.md @@ -0,0 +1,127 @@ +# Concurrent Queries + +A single `AsyncSession` cannot run two operations at once β€” concurrent use +raises SQLAlchemy's `InvalidRequestError: This session is provisioning a new +connection; concurrent operations are not permitted`. To run queries in +**parallel** you need a session **per task**, and you need to keep the number of +simultaneous sessions under your connection-pool limit. + +`multi_sessions` mode solves both problems. + +## The problem with sharing one session + +```python +# ❌ Don't do this β€” all tasks share the one request session +async def bad(): + await asyncio.gather( + db.session.execute(text("SELECT 1")), + db.session.execute(text("SELECT 2")), # concurrent op on same session + ) +``` + +## `multi_sessions=True` + +Opening `db(multi_sessions=True)` switches `db.session` to give **each task its +own session**, tracked and cleaned up by the middleware: + +```python +import asyncio +from sqlalchemy import text + +async def run(): + async with db(multi_sessions=True): + async def worker(n: int): + # each task gets a distinct session + return await db.session.execute(text(f"SELECT {n}")) + + await asyncio.gather(*(worker(i) for i in range(5))) +``` + +Child task sessions are committed/rolled back and closed for you as each task +finishes. All child tasks **must complete before the `async with` block exits**. + +## Throttling with `max_concurrent` + +Unbounded parallelism will exhaust the pool and raise +`TimeoutError: QueuePool limit ... reached`. Set `max_concurrent` to cap the +number of sessions holding a connection at once. When you do, child tasks must +acquire their session through **`db.connection()`** or **`db.gather()`** so the +middleware owns both the session lifetime and the semaphore slot. + +### `db.gather()` β€” the easy path + +A drop-in, pool-aware replacement for `asyncio.gather`. Each coroutine acquires +a slot (and a session) before it runs and releases it afterwards: + +```python +async def do_work(n: int) -> int: + async with db.connection() as session: + result = await session.execute(text(f"SELECT {n}")) + return result.scalar_one() + + +async def run(): + async with db(multi_sessions=True, max_concurrent=10): + results = await db.gather(*(do_work(i) for i in range(100))) + # never more than 10 connections in flight +``` + +!!! warning "Pass coroutines, not Tasks/Futures" + When `max_concurrent` is set, `db.gather()` accepts **coroutine objects + only**. A pre-created `Task` or `Future` may already be running outside the + semaphore, so it is rejected with `TypeError`. Pass `do_work(i)`, not + `asyncio.create_task(do_work(i))`. + +### `db.connection()` β€” explicit slots + +When you create your own tasks, open the session inside each task with +`db.connection()`. It waits for a free slot before creating the session and +releases it when the block exits: + +```python +async def run(): + async with db(multi_sessions=True, max_concurrent=10): + async def execute_query(query: str): + async with db.connection() as session: + return await session.execute(text(query)) + + tasks = [ + asyncio.create_task(execute_query(f"SELECT {i}")) + for i in range(50) + ] + await asyncio.gather(*tasks) +``` + +Without `max_concurrent`, `db.connection()` still works β€” it just creates a +session without throttling and cleans it up on exit. + +## Rules to remember + +- Child tasks that use the database **must finish before** the owning + `async with db(multi_sessions=True)` block exits. Tasks still parked on the + semaphore (or still running) when the block starts closing are cancelled. +- With `max_concurrent` set, **direct `db.session` access from a child task is + rejected** β€” it isn't throttled. Use `db.connection()` or `db.gather()` + instead. (The parent task may still use `db.session` directly.) +- Creating a new `db.connection()` session **after** the context has begun + closing raises `RuntimeError`. +- `max_concurrent` must be `>= 1`, otherwise `ValueError` is raised. + +## Choosing an approach + +```mermaid +flowchart TD + A[Need parallel DB work?] -->|no| B[Use db.session directly] + A -->|yes| C{Bounded by pool?} + C -->|"just a few tasks"| D["db(multi_sessions=True)"] + C -->|"many tasks"| E["db(multi_sessions=True, max_concurrent=N)"] + E --> F{Own your tasks?} + F -->|no, pass coroutines| G["db.gather(...)"] + F -->|yes, create_task| H["db.connection() inside each task"] +``` + +| Scenario | Use | +| ------------------------------------------ | ---------------------------------------------- | +| A handful of parallel queries | `db(multi_sessions=True)` + `db.session` | +| Many queries, cap connections, simplest | `db(multi_sessions=True, max_concurrent=N)` + `db.gather()` | +| Many queries, you manage the tasks | `db(multi_sessions=True, max_concurrent=N)` + `db.connection()` | diff --git a/docs/guide/engine-lifecycle.md b/docs/guide/engine-lifecycle.md new file mode 100644 index 0000000..c593105 --- /dev/null +++ b/docs/guide/engine-lifecycle.md @@ -0,0 +1,101 @@ +# Engine Lifecycle + +The middleware can either **own** the SQLAlchemy async engine or **borrow** one +you created. Ownership decides who disposes the connection pool β€” and getting +this wrong leaks connections on shutdown. This page makes the rules explicit. + +## Two ways to provide an engine + +=== "Middleware owns the engine (`db_url`)" + + ```python + app.add_middleware( + SQLAlchemyMiddleware, + db_url="postgresql+asyncpg://user:pass@localhost/app", + engine_args={"pool_size": 5, "max_overflow": 10}, + ) + ``` + + The middleware calls `create_async_engine(db_url, **engine_args)`, **owns** + the result, and disposes it during ASGI lifespan shutdown. + +=== "You own the engine (`custom_engine`)" + + ```python + from sqlalchemy.ext.asyncio import create_async_engine + + engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/app") + app.add_middleware(SQLAlchemyMiddleware, custom_engine=engine) + + # later, in your own shutdown / test cleanup: + await engine.dispose() + ``` + + The middleware uses your engine but **never disposes it**. Disposal is your + responsibility. + +!!! note "One of the two is required" + You must pass either `db_url` or `custom_engine`. Passing neither raises + `ValueError`. + +## Disposal during ASGI lifespan + +For a middleware-owned engine (`db_url`), disposal is automatic. It happens when +the lifespan ends β€” including failure paths β€” so a raising shutdown handler +can't leak the pool: + +- `lifespan.shutdown.complete` +- `lifespan.shutdown.failed` +- `lifespan.startup.failed` + +The engine is disposed **once**, for the application lifetime β€” not per request. + +```python +from fastapi.testclient import TestClient + +# Running the lifespan (e.g. via TestClient context) triggers disposal: +with TestClient(app): + ... +# <- engine disposed here +``` + +!!! warning "Disposal blocks the shutdown ack" + Engine disposal runs **before** the lifespan acknowledgement is forwarded to + the ASGI server, so a slow pool drain delays graceful shutdown. Configure + your server's graceful-shutdown timeout (e.g. uvicorn's + `--timeout-graceful-shutdown`) to cover the worst-case time to close active + connections. + +## Manual disposal outside a lifespan + +When you build `SQLAlchemyMiddleware(db_url=...)` **outside** an ASGI lifespan β€” +a script, an ad-hoc harness, a non-ASGI runtime β€” there is no +`lifespan.shutdown` event, so nothing triggers disposal. Call `dispose()` +yourself: + +```python +middleware = SQLAlchemyMiddleware(app, db_url="postgresql+asyncpg://...") +try: + ... # use db.session +finally: + await middleware.dispose() +``` + +`dispose()` is: + +- **Idempotent on success** β€” calling it again is a no-op. +- **Safe to retry on failure** β€” the proxy's session bindings are cleared + deterministically, so a later call actually re-attempts `engine.dispose()` + rather than silently no-op'ing on a half-disposed engine. +- **A no-op for borrowed engines** β€” if you passed `custom_engine`, `dispose()` + does nothing; you own that engine. + +The same guidance applies to each pair returned by +[`create_middleware_and_session_proxy()`](multi-database.md). + +## Summary + +| You pass | Engine created by | Disposed by | When | +| ---------------- | ----------------- | ------------------------------- | -------------------------------------- | +| `db_url` | the middleware | the middleware | lifespan shutdown, or `dispose()` | +| `custom_engine` | you | you (`await engine.dispose()`) | whenever your own cleanup runs | diff --git a/docs/guide/events.md b/docs/guide/events.md new file mode 100644 index 0000000..70ba4bd --- /dev/null +++ b/docs/guide/events.md @@ -0,0 +1,85 @@ +# SQLAlchemy Events + +SQLAlchemy's [event system](https://docs.sqlalchemy.org/en/20/orm/events.html) +is **independent of the session and engine**. This middleware doesn't change how +events fire β€” register listeners on your mapped classes (or on `Mapper` / +`Session`) with `sqlalchemy.event.listens_for` exactly as you would in a +synchronous SQLAlchemy setup. + +## Registering listeners + +```python +from datetime import datetime +from sqlalchemy import Column, DateTime, Integer, String, event +from sqlalchemy.orm import DeclarativeBase + + +class Base(DeclarativeBase): + pass + + +class User(Base): + __tablename__ = "users" + id = Column(Integer, primary_key=True) + username = Column(String(50), unique=True, nullable=False) + created_at = Column(DateTime, default=datetime.utcnow) + updated_at = Column(DateTime, default=datetime.utcnow) + + +@event.listens_for(User, "before_insert") +def normalize(mapper, connection, target): + target.username = target.username.lower().strip() + + +@event.listens_for(User, "before_update") +def touch_updated_at(mapper, connection, target): + target.updated_at = datetime.utcnow() + + +@event.listens_for(User, "after_insert") +def log_insert(mapper, connection, target): + print(f"user created: id={target.id}") +``` + +These fire when the session flushes, just like always. + +## :warning: Mapper events are synchronous + +Mapper-level events receive a **synchronous** `connection` argument: + +`before_insert` Β· `after_insert` Β· `before_update` Β· `after_update` Β· +`before_delete` Β· `after_delete` + +Inside these handlers: + +- **Do not** `await` anything. +- **Do not** call async ORM APIs. + +```python +@event.listens_for(User, "before_insert") +def handler(mapper, connection, target): + # βœ… pure, synchronous work on `target` + target.username = target.username.strip().lower() + # ❌ await db.session.execute(...) <- not allowed here +``` + +If you need async work after a write, do it **after** `await +db.session.commit()` returns, or use `Session`-level events such as +`after_flush` / `after_commit` and schedule the async work from there. + +## A complete example + +A runnable example with validation, automatic timestamps, audit logging, and a +commented-out soft-delete hook lives in the repository at +[`examples/events_example.py`](https://github.com/h0rn3t/fastapi-async-sqlalchemy/blob/main/examples/events_example.py). +It wires the listeners above into a small FastAPI CRUD app: + +```python +@app.post("/users") +async def create_user(username: str, email: str, full_name: str | None = None): + async with db(): + user = User(username=username, email=email, full_name=full_name) + db.session.add(user) + await db.session.commit() # before_insert / after_insert fire on flush + return {"id": user.id, "username": user.username} +``` diff --git a/docs/guide/index.md b/docs/guide/index.md new file mode 100644 index 0000000..a682eab --- /dev/null +++ b/docs/guide/index.md @@ -0,0 +1,87 @@ +# User Guide + +The middleware is small but covers several distinct concerns. Pick the topic you +need β€” each page is self-contained. + +
+ +- :material-database-sync:{ .lg .middle } __Sessions & Contexts__ + + --- + + The `db.session` proxy, the `async with db()` context manager, + `commit_on_exit`, and when each session is created and closed. + + [:octicons-arrow-right-24: Read](sessions.md) + +- :material-engine:{ .lg .middle } __Engine Lifecycle__ + + --- + + Who owns the engine (`db_url` vs `custom_engine`), when it is disposed, and + how to dispose it manually outside an ASGI lifespan. + + [:octicons-arrow-right-24: Read](engine-lifecycle.md) + +- :material-arrow-decision:{ .lg .middle } __Concurrent Queries__ + + --- + + `multi_sessions`, `max_concurrent`, `db.gather()` and `db.connection()` β€” + parallel work without blowing past the connection pool. + + [:octicons-arrow-right-24: Read](concurrency.md) + +- :material-download-network:{ .lg .middle } __Streaming Responses__ + + --- + + Why streaming bodies need their own session and how to write one safely. + + [:octicons-arrow-right-24: Read](streaming.md) + +- :material-transit-connection-variant:{ .lg .middle } __Multiple Databases__ + + --- + + `create_middleware_and_session_proxy()` for independent apps or databases. + + [:octicons-arrow-right-24: Read](multi-database.md) + +- :material-bell-ring:{ .lg .middle } __SQLAlchemy Events__ + + --- + + Using `before_insert` / `after_update` and friends with async sessions. + + [:octicons-arrow-right-24: Read](events.md) + +- :material-language-python:{ .lg .middle } __Type Hints__ + + --- + + Annotate `db` with `DBSessionMeta` for full mypy / IDE support. + + [:octicons-arrow-right-24: Read](type-hints.md) + +
+ +## Mental model + +```mermaid +flowchart LR + A[HTTP request] --> B[SQLAlchemyMiddleware] + B -->|opens| C[AsyncSession bound to ContextVar] + C --> D[Route / service code
reads db.session] + D --> E{clean exit?} + E -->|yes + commit_on_exit| F[commit] + E -->|exception| G[rollback] + F --> H[close session] + G --> H + H --> I[response sent] +``` + +The session lives for the duration of the request context. Everything in the +guide is a variation on this: opening extra contexts (`async with db()`), +running many sessions at once (`multi_sessions`), or moving the lifetime into a +streaming body. diff --git a/docs/guide/multi-database.md b/docs/guide/multi-database.md new file mode 100644 index 0000000..568b1f0 --- /dev/null +++ b/docs/guide/multi-database.md @@ -0,0 +1,102 @@ +# Multiple Databases + +The default `SQLAlchemyMiddleware` / `db` pair is bound to **one** engine. To +talk to several independent databases, create a separate middleware/session +proxy pair for each with `create_middleware_and_session_proxy()`. + +## Create one pair per database + +```python title="databases.py" +from fastapi_async_sqlalchemy import create_middleware_and_session_proxy + +FirstSQLAlchemyMiddleware, first_db = create_middleware_and_session_proxy() +SecondSQLAlchemyMiddleware, second_db = create_middleware_and_session_proxy() +``` + +Each call returns an independent `(middleware_class, db_proxy)` tuple with its +own `ContextVar` state and engine binding. + +!!! info "The default pair is just a pre-made instance" + `SQLAlchemyMiddleware, db = create_middleware_and_session_proxy()` is exactly + how the package builds the defaults it exports. Calling it yourself gives you + additional, fully isolated pairs. + +## Wire them into the app + +```python title="main.py" +from fastapi import FastAPI + +from databases import FirstSQLAlchemyMiddleware, SecondSQLAlchemyMiddleware +from routes import router + +app = FastAPI() +app.include_router(router) + +app.add_middleware( + FirstSQLAlchemyMiddleware, + db_url="postgresql+asyncpg://user:pass@localhost:5432/primary_db", + engine_args={"pool_size": 5, "max_overflow": 10}, +) +app.add_middleware( + SecondSQLAlchemyMiddleware, + db_url="mysql+aiomysql://user:pass@localhost:3306/secondary_db", + engine_args={"pool_size": 5, "max_overflow": 10}, +) +``` + +## Use the right proxy in each route + +```python title="routes.py" +from fastapi import APIRouter +from sqlalchemy import column, table + +from databases import first_db, second_db + +router = APIRouter() +files = table("ms_files", column("id")) + + +@router.get("/first-db-files") +async def get_files_from_first_db(): + result = await first_db.session.execute(files.select()) + return result.fetchall() + + +@router.get("/second-db-files") +async def get_files_from_second_db(): + result = await second_db.session.execute(files.select()) + return result.fetchall() +``` + +Each proxy resolves its own request-scoped session, so `first_db.session` and +`second_db.session` never collide. + +## One proxy, one live engine + +A proxy is **bound to a single live engine**. Reusing the same proxy with a +different live engine is rejected: + +```text +RuntimeError: This SQLAlchemy session proxy is already bound to another live +engine. Use create_middleware_and_session_proxy() for independent apps or +databases. +``` + +This guard exists so requests can never silently switch to a different database +binding. The fix is to use a **fresh pair** per app or database β€” exactly what +`create_middleware_and_session_proxy()` is for. + +!!! tip "Rebinding after disposal" + The binding is cleared when the owning middleware's engine is disposed (via + lifespan shutdown or `await middleware.dispose()`). After disposal the proxy + is free to bind a new engine β€” useful in test suites that build and tear down + an app per test. + +## Concurrency still applies per proxy + +Each proxy supports the full [concurrency API](concurrency.md) independently: + +```python +async with first_db(multi_sessions=True, max_concurrent=10): + results = await first_db.gather(*(work(i) for i in range(100))) +``` diff --git a/docs/guide/sessions.md b/docs/guide/sessions.md new file mode 100644 index 0000000..89fd84a --- /dev/null +++ b/docs/guide/sessions.md @@ -0,0 +1,137 @@ +# Sessions & Contexts + +Everything in this library revolves around one idea: **`db.session` is an +`AsyncSession` bound to the current async context.** This page explains how that +session is created, where it lives, and how to open your own contexts. + +## The `db` proxy + +`db` is a global object exported from the package: + +```python +from fastapi_async_sqlalchemy import db +``` + +It exposes a small surface: + +| Member | Kind | Purpose | +| ------------------ | --------------------- | ---------------------------------------------------- | +| `db.session` | property | The `AsyncSession` for the current context | +| `db(...)` | callable | Open an explicit session context manager | +| `db.connection()` | method | Throttled session context manager (multi-session) | +| `db.gather(...)` | coroutine | Pool-aware `asyncio.gather` (multi-session) | + +`db.session` is backed by a [`ContextVar`][contextvar], so each request β€” each +independent async context β€” sees its own session. You never pass it around. + +## Inside a request + +When `SQLAlchemyMiddleware` is installed, every HTTP request gets a session +opened **before** your route runs and finalized **after** it returns: + +```python +@app.get("/items/{item_id}") +async def get_item(item_id: int): + item = await db.session.get(Item, item_id) + return item +``` + +The same session is visible from any function called during the request, no +arguments required: + +```python +async def load_item(item_id: int) -> Item | None: + # same session as the route β€” resolved from the request context + return await db.session.get(Item, item_id) + + +@app.get("/items/{item_id}") +async def get_item(item_id: int): + return await load_item(item_id) +``` + +### Commit on exit + +By default the request session is **not** committed for you β€” you call +`await db.session.commit()` yourself. Set `commit_on_exit=True` to commit +automatically when the request finishes cleanly: + +```python +app.add_middleware( + SQLAlchemyMiddleware, + db_url="postgresql+asyncpg://user:pass@localhost/app", + commit_on_exit=True, +) +``` + +The finalization rules are: + +- **Clean exit + `commit_on_exit=True`** β†’ `commit()`, then `close()`. +- **Clean exit + `commit_on_exit=False`** (default) β†’ just `close()` (uncommitted + work is rolled back by closing). +- **Exception** β†’ `rollback()`, then `close()`. The original exception + propagates; a failure during rollback/commit/close is surfaced too. + +!!! warning "Commit/rollback errors are not swallowed" + If `commit()` fails, the middleware attempts a `rollback()` and raises, so a + write failure can never be reported to the client as success. + +## Outside a request: `async with db()` + +Anywhere there is no request context β€” startup/shutdown hooks, CLI scripts, +background tasks, tests β€” open a session explicitly: + +```python +async def get_db_fetch(): + async with db(): + result = await db.session.execute(foo.select()) + return result.fetchall() +``` + +`db()` accepts the same finalization options as the middleware: + +```python +async with db(commit_on_exit=True): + db.session.add(User(name="ada")) + # committed automatically on a clean exit +``` + +You can also pass `session_args` to override sessionmaker arguments for that one +context: + +```python +async with db(session_args={"expire_on_commit": True}): + ... +``` + +### `MissingSessionError` + +Accessing `db.session` with no active context raises +[`MissingSessionError`](../api-reference.md#exceptions): + +```python +# ❌ no request, no `async with db()` +result = await db.session.execute(foo.select()) # MissingSessionError +``` + +The fix is always the same β€” wrap the access in a context: + +```python +async with db(): + result = await db.session.execute(foo.select()) +``` + +### `SessionNotInitialisedError` + +If you access `db.session` before any `SQLAlchemyMiddleware` has been +constructed (so the sessionmaker doesn't exist yet), you get +[`SessionNotInitialisedError`](../api-reference.md#exceptions) instead. Make sure +`app.add_middleware(SQLAlchemyMiddleware, ...)` runs during app setup. + +## Where to go next + +- Run **many** sessions at once β†’ [Concurrent Queries](concurrency.md) +- Stream a large response body β†’ [Streaming Responses](streaming.md) +- Understand engine ownership and shutdown β†’ [Engine Lifecycle](engine-lifecycle.md) + +[contextvar]: https://docs.python.org/3/library/contextvars.html diff --git a/docs/guide/streaming.md b/docs/guide/streaming.md new file mode 100644 index 0000000..adffb82 --- /dev/null +++ b/docs/guide/streaming.md @@ -0,0 +1,102 @@ +# Streaming Responses + +A `StreamingResponse` (or `FileResponse`) has a **different lifetime** from a +normal request transaction. The body keeps yielding chunks *after* your route +function returns, but the middleware-managed request session is tied to the +request transaction β€” not to the stream. So you must not rely on `db.session` +staying open while a streaming body runs. + +The rule: **open an explicit session inside the generator** so the body owns its +own database lifetime. + +## The right way + +```python +from fastapi.responses import StreamingResponse +from fastapi_async_sqlalchemy import db + +@app.get("/export") +async def export(): + async def rows(): + async with db(): # body-owned session + result = await db.session.stream(foo.select()) + async for row in result: + yield f"{row.id}\n".encode() + + return StreamingResponse(rows(), media_type="text/plain") +``` + +The `async with db()` inside the generator makes the session lifetime explicit +and keeps the session open for the whole body. + +## Why not `commit_on_exit=True`? + +Implicit `commit_on_exit=True` is **not** a safe way to report streaming write +success. The response may have already started β€” and early chunks already sent β€” +before an unbounded body finishes. A late commit failure cannot un-send those +chunks. + +To enforce this, the middleware actively rejects the unsafe combination. If a +streaming response begins while `commit_on_exit=True` **and** the request +session was already used, it raises: + +```text +RuntimeError: `commit_on_exit=True` cannot use the middleware-managed request +database session with a streaming response. Use `async with db()` inside the +streaming generator, or manage the streaming transaction explicitly. +``` + +Similarly, once the request session has been closed for streaming, touching it +again raises a `RuntimeError` telling you to use `async with db()` inside the +generator. + +## If a streaming route needs to write + +Pick one of two explicit patterns: + +=== "Commit before streaming" + + Complete and commit the write in its own context **before** creating the + streaming response, then stream read-only: + + ```python + @app.post("/report") + async def make_report(): + async with db(commit_on_exit=True): + db.session.add(ReportRun(status="started")) + # committed here, before any streaming begins + + async def body(): + async with db(): + result = await db.session.stream(rows.select()) + async for row in result: + yield serialize(row) + + return StreamingResponse(body()) + ``` + +=== "Write inside the generator" + + Make the generator own an explicit write transaction and design the API so + clients don't treat early chunks as confirmation of a completed write: + + ```python + @app.get("/stream-and-write") + async def stream_and_write(): + async def body(): + async with db(commit_on_exit=True): + async for row in produce(): + db.session.add(AuditRow(data=row)) + yield row + # committed when the generator's context exits + + return StreamingResponse(body()) + ``` + +## Migrating existing code + +If you previously used `db.session` directly inside a streaming generator, move +that code into a generator-owned `async with db()` context as shown above. This +keeps database access available for the whole body while making it clear that +the session lifetime belongs to the stream, not the original request +transaction. diff --git a/docs/guide/type-hints.md b/docs/guide/type-hints.md new file mode 100644 index 0000000..5ce1eba --- /dev/null +++ b/docs/guide/type-hints.md @@ -0,0 +1,91 @@ +# Type Hints + +The package ships a `py.typed` marker, so type checkers read its inline +annotations. The one piece worth knowing about is how to annotate the `db` +proxy itself. + +## Annotating `db` with `DBSessionMeta` + +Use `DBSessionMeta` (or its alias `DBSessionType`) when you need to type a +function or attribute that holds the `db` proxy: + +```python +from fastapi_async_sqlalchemy import DBSessionMeta, db + + +def get_db() -> DBSessionMeta: + return db +``` + +This gives static checkers (mypy, pyright) and your IDE full autocomplete for +the proxy surface β€” `session`, `connection()`, `gather()` and the `db(...)` +call. + +## Runtime vs. type-check behavior + +`DBSessionMeta` is deliberately two things at once: + +- **At runtime** it is the actual metaclass of `db`, so identity and instance + checks work as they did in earlier versions: + + ```python + from fastapi_async_sqlalchemy import DBSessionMeta, db + + assert isinstance(db, DBSessionMeta) + assert type(db) is DBSessionMeta + ``` + +- **At type-check time** it resolves to a structural + [`Protocol`](https://docs.python.org/3/library/typing.html#typing.Protocol) + describing the public API. That's what powers autocomplete and `mypy` + checking when you annotate with it. + +The Protocol surface is: + +```python +class DBSessionMeta(Protocol): + @property + def session(self) -> AsyncSession: ... + + def connection(self) -> AbstractAsyncContextManager[AsyncSession]: ... + + async def gather( + self, *coros_or_futures: Any, return_exceptions: bool = ... + ) -> list[Any]: ... + + def __call__( + self, + session_args: dict[str, Any] | None = ..., + commit_on_exit: bool = ..., + multi_sessions: bool = ..., + max_concurrent: int | None = ..., + ) -> AbstractAsyncContextManager[Any]: ... +``` + +`DBSessionType` is an exported alias of `DBSessionMeta` β€” use whichever name +reads better in your codebase. + +## Dependency-injection style + +If you prefer passing `db` explicitly (e.g. for testability) rather than +importing the global, the annotation makes it first-class: + +```python +from fastapi import Depends +from fastapi_async_sqlalchemy import DBSessionMeta, db + + +def get_db() -> DBSessionMeta: + return db + + +async def list_users(database: DBSessionMeta = Depends(get_db)): + result = await database.session.execute(users.select()) + return result.fetchall() +``` + +## Works with SQLModel + +If `sqlmodel` is installed, the middleware uses `sqlmodel`'s `AsyncSession` +subclass automatically, so `db.session` exposes SQLModel's session API. No extra +configuration is needed. diff --git a/docs/index.md b/docs/index.md new file mode 100644 index 0000000..7fce099 --- /dev/null +++ b/docs/index.md @@ -0,0 +1,126 @@ +--- +hide: + - navigation + - toc +--- + +
+ +# FastAPI Async SQLAlchemy + +Drop-in async SQLAlchemy middleware for FastAPI. A request-scoped +`AsyncSession` you reach through a single global `db` β€” no per-route +dependency wiring, no manual session plumbing. + +
+[Get started :material-arrow-right:](getting-started.md){ .md-button .md-button--primary } +[View on GitHub](https://github.com/h0rn3t/fastapi-async-sqlalchemy){ .md-button } +
+ +
+[![PyPI](https://img.shields.io/pypi/v/fastapi_async_sqlalchemy?color=0e8a8a&label=pypi)](https://pypi.org/project/fastapi-async-sqlalchemy/) +[![Downloads](https://static.pepy.tech/badge/fastapi-async-sqlalchemy)](https://pepy.tech/project/fastapi-async-sqlalchemy) +[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT) +[![CI](https://github.com/h0rn3t/fastapi-async-sqlalchemy/workflows/ci/badge.svg)](https://github.com/h0rn3t/fastapi-async-sqlalchemy/actions) +
+ +
+ +## Why this middleware? + +SQLAlchemy's `AsyncSession` is not safe to share across concurrent tasks, and +FastAPI gives you a fresh request per coroutine. This middleware binds **one +session to each request context** using a Python [`ContextVar`][contextvar], so +`db.session` always resolves to the right session for the request you're in β€” +whether you access it from a route, a service function, or a background helper. + +```python +from fastapi import FastAPI +from fastapi_async_sqlalchemy import SQLAlchemyMiddleware, db +from sqlalchemy import text + +app = FastAPI() +app.add_middleware( + SQLAlchemyMiddleware, + db_url="postgresql+asyncpg://user:pass@localhost:5432/app", +) + +@app.get("/ping") +async def ping(): + result = await db.session.execute(text("SELECT 1")) + return {"db": result.scalar()} +``` + +No `Depends(get_session)`, no passing the session down every call. Access the +session anywhere in the request with `db.session`. + +## Features + +
+ +- :material-database-sync:{ .lg .middle } __Request-scoped sessions__ + + --- + + A `ContextVar`-backed `AsyncSession` per request. Reach it from anywhere + with `db.session` β€” no dependency injection boilerplate. + + [:octicons-arrow-right-24: Sessions & contexts](guide/sessions.md) + +- :material-engine:{ .lg .middle } __Engine lifecycle done right__ + + --- + + Pass a `db_url` and the middleware owns and disposes the engine on + shutdown; pass a `custom_engine` and you keep ownership. + + [:octicons-arrow-right-24: Engine lifecycle](guide/engine-lifecycle.md) + +- :material-arrow-decision:{ .lg .middle } __Pool-throttled concurrency__ + + --- + + Run many queries in parallel without exhausting the pool. `db.gather()` + and `db.connection()` cap in-flight sessions at `max_concurrent`. + + [:octicons-arrow-right-24: Concurrent queries](guide/concurrency.md) + +- :material-transit-connection-variant:{ .lg .middle } __Multiple databases__ + + --- + + Build independent middleware/proxy pairs with + `create_middleware_and_session_proxy()` β€” one per database. + + [:octicons-arrow-right-24: Multiple databases](guide/multi-database.md) + +- :material-download-network:{ .lg .middle } __Streaming-aware__ + + --- + + Clear rules for `StreamingResponse` so the session lifetime belongs to the + body, not a closed request transaction. + + [:octicons-arrow-right-24: Streaming responses](guide/streaming.md) + +- :material-language-python:{ .lg .middle } __Typed & SQLModel-ready__ + + --- + + Ships `py.typed`, a `DBSessionMeta` Protocol for full autocomplete, and + works transparently with `sqlmodel`. + + [:octicons-arrow-right-24: Type hints](guide/type-hints.md) + +
+ +## Installation + +```bash +pip install fastapi-async-sqlalchemy +``` + +Requires Python 3.12+, `starlette>=0.40`, and `SQLAlchemy>=2.0`. Add the async +driver for your database (`asyncpg`, `aiomysql`, `aiosqlite`, …). + +[contextvar]: https://docs.python.org/3/library/contextvars.html diff --git a/mkdocs.yml b/mkdocs.yml new file mode 100644 index 0000000..accc708 --- /dev/null +++ b/mkdocs.yml @@ -0,0 +1,124 @@ +site_name: FastAPI Async SQLAlchemy +site_description: >- + Async SQLAlchemy middleware for FastAPI β€” a request-scoped AsyncSession + proxy with multi-database, pool-throttled concurrency, streaming and engine + lifecycle management. +site_author: Eugene Shershen +site_url: https://h0rn3t.github.io/fastapi-async-sqlalchemy/ + +repo_name: h0rn3t/fastapi-async-sqlalchemy +repo_url: https://github.com/h0rn3t/fastapi-async-sqlalchemy +edit_uri: edit/main/docs/ + +copyright: Copyright © Eugene Shershen β€” MIT License + +theme: + name: material + language: en + icon: + logo: material/database-sync + repo: fontawesome/brands/github + favicon: assets/favicon.svg + palette: + - media: "(prefers-color-scheme)" + toggle: + icon: material/brightness-auto + name: Switch to light mode + - media: "(prefers-color-scheme: light)" + scheme: default + primary: teal + accent: deep orange + toggle: + icon: material/weather-sunny + name: Switch to dark mode + - media: "(prefers-color-scheme: dark)" + scheme: slate + primary: teal + accent: amber + toggle: + icon: material/weather-night + name: Switch to system preference + font: + text: Inter + code: JetBrains Mono + features: + - navigation.tabs + - navigation.tabs.sticky + - navigation.sections + - navigation.top + - navigation.tracking + - navigation.indexes + - navigation.footer + - toc.follow + - search.suggest + - search.highlight + - search.share + - content.code.copy + - content.code.annotate + - content.tabs.link + - content.tooltips + +extra: + social: + - icon: fontawesome/brands/github + link: https://github.com/h0rn3t/fastapi-async-sqlalchemy + - icon: fontawesome/brands/python + link: https://pypi.org/project/fastapi-async-sqlalchemy/ + generator: false + +extra_css: + - assets/extra.css + +plugins: + - search + +markdown_extensions: + - abbr + - admonition + - attr_list + - def_list + - footnotes + - md_in_html + - tables + - toc: + permalink: true + title: On this page + - pymdownx.betterem + - pymdownx.caret + - pymdownx.mark + - pymdownx.tilde + - pymdownx.keys + - pymdownx.details + - pymdownx.inlinehilite + - pymdownx.smartsymbols + - pymdownx.highlight: + anchor_linenums: true + line_spans: __span + pygments_lang_class: true + - pymdownx.superfences: + custom_fences: + - name: mermaid + class: mermaid + format: !!python/name:pymdownx.superfences.fence_code_format + - pymdownx.tabbed: + alternate_style: true + - pymdownx.tasklist: + custom_checkbox: true + - pymdownx.emoji: + emoji_index: !!python/name:material.extensions.emoji.twemoji + emoji_generator: !!python/name:material.extensions.emoji.to_svg + +nav: + - Home: index.md + - Getting Started: getting-started.md + - User Guide: + - guide/index.md + - Sessions & Contexts: guide/sessions.md + - Engine Lifecycle: guide/engine-lifecycle.md + - Concurrent Queries: guide/concurrency.md + - Streaming Responses: guide/streaming.md + - Multiple Databases: guide/multi-database.md + - SQLAlchemy Events: guide/events.md + - Type Hints: guide/type-hints.md + - API Reference: api-reference.md + - FAQ & Troubleshooting: faq.md diff --git a/requirements-docs.txt b/requirements-docs.txt new file mode 100644 index 0000000..19058a1 --- /dev/null +++ b/requirements-docs.txt @@ -0,0 +1,2 @@ +# Documentation toolchain β€” install with: pip install -r requirements-docs.txt +mkdocs-material>=9.5,<10 From c6272d20d3807d5ba5b365f918deb2a541a322d4 Mon Sep 17 00:00:00 2001 From: Eugene Shershen Date: Mon, 15 Jun 2026 00:28:37 +0300 Subject: [PATCH 4/4] update idna version constraint to >=3.15 in requirements.txt --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index e514fb1..0c86149 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,7 +8,7 @@ coverage>=5.2.1 entrypoints==0.3 fastapi>=0.115 flake8==3.7.9 -idna==3.7 +idna>=3.15 importlib-metadata==1.5.0 isort==5.13.2 mccabe==0.6.1