. The middleware opened a session for the
+request, made it available as `db.session`, and closed it when the response was
+sent.
+
+!!! tip "That's the whole idea"
+ You never created or passed a session. `db.session` is bound to the current
+ request via a `ContextVar`, so it resolves correctly even from helper
+ functions called deep inside the request.
+
+## Configuring the engine
+
+Pass engine and session options through `engine_args` / `session_args`. These
+are forwarded verbatim to SQLAlchemy's
+[`create_async_engine`](https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html)
+and `async_sessionmaker`.
+
+```python
+app.add_middleware(
+ SQLAlchemyMiddleware,
+ db_url="postgresql+asyncpg://user:pass@localhost:5432/app",
+ engine_args={
+ "echo": True, # log every SQL statement
+ "pool_pre_ping": True, # validate connections before use
+ "pool_size": 5, # connections kept open
+ "max_overflow": 10, # extra connections allowed above pool_size
+ },
+ commit_on_exit=True, # commit the request session on a clean exit
+)
+```
+
+## Using the session outside a request
+
+Outside the request/response cycle (startup hooks, scripts, workers) there is no
+middleware to open a session for you. Open one explicitly with the `db` context
+manager:
+
+```python
+async def warm_cache():
+ async with db():
+ result = await db.session.execute(files.select())
+ return result.fetchall()
+
+
+@app.on_event("startup")
+async def on_startup():
+ await warm_cache()
+```
+
+## Next steps
+
+
+
+- :material-database-sync: [__Sessions & contexts__](guide/sessions.md) β how `db.session` and `async with db()` behave
+- :material-engine: [__Engine lifecycle__](guide/engine-lifecycle.md) β ownership, disposal, graceful shutdown
+- :material-arrow-decision: [__Concurrent queries__](guide/concurrency.md) β run parallel queries safely
+- :material-book-open-variant: [__API reference__](api-reference.md) β every public symbol
+
+
diff --git a/docs/guide/concurrency.md b/docs/guide/concurrency.md
new file mode 100644
index 0000000..4bf7c97
--- /dev/null
+++ b/docs/guide/concurrency.md
@@ -0,0 +1,127 @@
+# Concurrent Queries
+
+A single `AsyncSession` cannot run two operations at once β concurrent use
+raises SQLAlchemy's `InvalidRequestError: This session is provisioning a new
+connection; concurrent operations are not permitted`. To run queries in
+**parallel** you need a session **per task**, and you need to keep the number of
+simultaneous sessions under your connection-pool limit.
+
+`multi_sessions` mode solves both problems.
+
+## The problem with sharing one session
+
+```python
+# β Don't do this β all tasks share the one request session
+async def bad():
+ await asyncio.gather(
+ db.session.execute(text("SELECT 1")),
+ db.session.execute(text("SELECT 2")), # concurrent op on same session
+ )
+```
+
+## `multi_sessions=True`
+
+Opening `db(multi_sessions=True)` switches `db.session` to give **each task its
+own session**, tracked and cleaned up by the middleware:
+
+```python
+import asyncio
+from sqlalchemy import text
+
+async def run():
+ async with db(multi_sessions=True):
+ async def worker(n: int):
+ # each task gets a distinct session
+ return await db.session.execute(text(f"SELECT {n}"))
+
+ await asyncio.gather(*(worker(i) for i in range(5)))
+```
+
+Child task sessions are committed/rolled back and closed for you as each task
+finishes. All child tasks **must complete before the `async with` block exits**.
+
+## Throttling with `max_concurrent`
+
+Unbounded parallelism will exhaust the pool and raise
+`TimeoutError: QueuePool limit ... reached`. Set `max_concurrent` to cap the
+number of sessions holding a connection at once. When you do, child tasks must
+acquire their session through **`db.connection()`** or **`db.gather()`** so the
+middleware owns both the session lifetime and the semaphore slot.
+
+### `db.gather()` β the easy path
+
+A drop-in, pool-aware replacement for `asyncio.gather`. Each coroutine acquires
+a slot (and a session) before it runs and releases it afterwards:
+
+```python
+async def do_work(n: int) -> int:
+ async with db.connection() as session:
+ result = await session.execute(text(f"SELECT {n}"))
+ return result.scalar_one()
+
+
+async def run():
+ async with db(multi_sessions=True, max_concurrent=10):
+ results = await db.gather(*(do_work(i) for i in range(100)))
+ # never more than 10 connections in flight
+```
+
+!!! warning "Pass coroutines, not Tasks/Futures"
+ When `max_concurrent` is set, `db.gather()` accepts **coroutine objects
+ only**. A pre-created `Task` or `Future` may already be running outside the
+ semaphore, so it is rejected with `TypeError`. Pass `do_work(i)`, not
+ `asyncio.create_task(do_work(i))`.
+
+### `db.connection()` β explicit slots
+
+When you create your own tasks, open the session inside each task with
+`db.connection()`. It waits for a free slot before creating the session and
+releases it when the block exits:
+
+```python
+async def run():
+ async with db(multi_sessions=True, max_concurrent=10):
+ async def execute_query(query: str):
+ async with db.connection() as session:
+ return await session.execute(text(query))
+
+ tasks = [
+ asyncio.create_task(execute_query(f"SELECT {i}"))
+ for i in range(50)
+ ]
+ await asyncio.gather(*tasks)
+```
+
+Without `max_concurrent`, `db.connection()` still works β it just creates a
+session without throttling and cleans it up on exit.
+
+## Rules to remember
+
+- Child tasks that use the database **must finish before** the owning
+ `async with db(multi_sessions=True)` block exits. Tasks still parked on the
+ semaphore (or still running) when the block starts closing are cancelled.
+- With `max_concurrent` set, **direct `db.session` access from a child task is
+ rejected** β it isn't throttled. Use `db.connection()` or `db.gather()`
+ instead. (The parent task may still use `db.session` directly.)
+- Creating a new `db.connection()` session **after** the context has begun
+ closing raises `RuntimeError`.
+- `max_concurrent` must be `>= 1`, otherwise `ValueError` is raised.
+
+## Choosing an approach
+
+```mermaid
+flowchart TD
+ A[Need parallel DB work?] -->|no| B[Use db.session directly]
+ A -->|yes| C{Bounded by pool?}
+ C -->|"just a few tasks"| D["db(multi_sessions=True)"]
+ C -->|"many tasks"| E["db(multi_sessions=True, max_concurrent=N)"]
+ E --> F{Own your tasks?}
+ F -->|no, pass coroutines| G["db.gather(...)"]
+ F -->|yes, create_task| H["db.connection() inside each task"]
+```
+
+| Scenario | Use |
+| ------------------------------------------ | ---------------------------------------------- |
+| A handful of parallel queries | `db(multi_sessions=True)` + `db.session` |
+| Many queries, cap connections, simplest | `db(multi_sessions=True, max_concurrent=N)` + `db.gather()` |
+| Many queries, you manage the tasks | `db(multi_sessions=True, max_concurrent=N)` + `db.connection()` |
diff --git a/docs/guide/engine-lifecycle.md b/docs/guide/engine-lifecycle.md
new file mode 100644
index 0000000..c593105
--- /dev/null
+++ b/docs/guide/engine-lifecycle.md
@@ -0,0 +1,101 @@
+# Engine Lifecycle
+
+The middleware can either **own** the SQLAlchemy async engine or **borrow** one
+you created. Ownership decides who disposes the connection pool β and getting
+this wrong leaks connections on shutdown. This page makes the rules explicit.
+
+## Two ways to provide an engine
+
+=== "Middleware owns the engine (`db_url`)"
+
+ ```python
+ app.add_middleware(
+ SQLAlchemyMiddleware,
+ db_url="postgresql+asyncpg://user:pass@localhost/app",
+ engine_args={"pool_size": 5, "max_overflow": 10},
+ )
+ ```
+
+ The middleware calls `create_async_engine(db_url, **engine_args)`, **owns**
+ the result, and disposes it during ASGI lifespan shutdown.
+
+=== "You own the engine (`custom_engine`)"
+
+ ```python
+ from sqlalchemy.ext.asyncio import create_async_engine
+
+ engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/app")
+ app.add_middleware(SQLAlchemyMiddleware, custom_engine=engine)
+
+ # later, in your own shutdown / test cleanup:
+ await engine.dispose()
+ ```
+
+ The middleware uses your engine but **never disposes it**. Disposal is your
+ responsibility.
+
+!!! note "One of the two is required"
+ You must pass either `db_url` or `custom_engine`. Passing neither raises
+ `ValueError`.
+
+## Disposal during ASGI lifespan
+
+For a middleware-owned engine (`db_url`), disposal is automatic. It happens when
+the lifespan ends β including failure paths β so a raising shutdown handler
+can't leak the pool:
+
+- `lifespan.shutdown.complete`
+- `lifespan.shutdown.failed`
+- `lifespan.startup.failed`
+
+The engine is disposed **once**, for the application lifetime β not per request.
+
+```python
+from fastapi.testclient import TestClient
+
+# Running the lifespan (e.g. via TestClient context) triggers disposal:
+with TestClient(app):
+ ...
+# <- engine disposed here
+```
+
+!!! warning "Disposal blocks the shutdown ack"
+ Engine disposal runs **before** the lifespan acknowledgement is forwarded to
+ the ASGI server, so a slow pool drain delays graceful shutdown. Configure
+ your server's graceful-shutdown timeout (e.g. uvicorn's
+ `--timeout-graceful-shutdown`) to cover the worst-case time to close active
+ connections.
+
+## Manual disposal outside a lifespan
+
+When you build `SQLAlchemyMiddleware(db_url=...)` **outside** an ASGI lifespan β
+a script, an ad-hoc harness, a non-ASGI runtime β there is no
+`lifespan.shutdown` event, so nothing triggers disposal. Call `dispose()`
+yourself:
+
+```python
+middleware = SQLAlchemyMiddleware(app, db_url="postgresql+asyncpg://...")
+try:
+ ... # use db.session
+finally:
+ await middleware.dispose()
+```
+
+`dispose()` is:
+
+- **Idempotent on success** β calling it again is a no-op.
+- **Safe to retry on failure** β the proxy's session bindings are cleared
+ deterministically, so a later call actually re-attempts `engine.dispose()`
+ rather than silently no-op'ing on a half-disposed engine.
+- **A no-op for borrowed engines** β if you passed `custom_engine`, `dispose()`
+ does nothing; you own that engine.
+
+The same guidance applies to each pair returned by
+[`create_middleware_and_session_proxy()`](multi-database.md).
+
+## Summary
+
+| You pass | Engine created by | Disposed by | When |
+| ---------------- | ----------------- | ------------------------------- | -------------------------------------- |
+| `db_url` | the middleware | the middleware | lifespan shutdown, or `dispose()` |
+| `custom_engine` | you | you (`await engine.dispose()`) | whenever your own cleanup runs |
diff --git a/docs/guide/events.md b/docs/guide/events.md
new file mode 100644
index 0000000..70ba4bd
--- /dev/null
+++ b/docs/guide/events.md
@@ -0,0 +1,85 @@
+# SQLAlchemy Events
+
+SQLAlchemy's [event system](https://docs.sqlalchemy.org/en/20/orm/events.html)
+is **independent of the session and engine**. This middleware doesn't change how
+events fire β register listeners on your mapped classes (or on `Mapper` /
+`Session`) with `sqlalchemy.event.listens_for` exactly as you would in a
+synchronous SQLAlchemy setup.
+
+## Registering listeners
+
+```python
+from datetime import datetime
+from sqlalchemy import Column, DateTime, Integer, String, event
+from sqlalchemy.orm import DeclarativeBase
+
+
+class Base(DeclarativeBase):
+ pass
+
+
+class User(Base):
+ __tablename__ = "users"
+ id = Column(Integer, primary_key=True)
+ username = Column(String(50), unique=True, nullable=False)
+ created_at = Column(DateTime, default=datetime.utcnow)
+ updated_at = Column(DateTime, default=datetime.utcnow)
+
+
+@event.listens_for(User, "before_insert")
+def normalize(mapper, connection, target):
+ target.username = target.username.lower().strip()
+
+
+@event.listens_for(User, "before_update")
+def touch_updated_at(mapper, connection, target):
+ target.updated_at = datetime.utcnow()
+
+
+@event.listens_for(User, "after_insert")
+def log_insert(mapper, connection, target):
+ print(f"user created: id={target.id}")
+```
+
+These fire when the session flushes, just like always.
+
+## :warning: Mapper events are synchronous
+
+Mapper-level events receive a **synchronous** `connection` argument:
+
+`before_insert` Β· `after_insert` Β· `before_update` Β· `after_update` Β·
+`before_delete` Β· `after_delete`
+
+Inside these handlers:
+
+- **Do not** `await` anything.
+- **Do not** call async ORM APIs.
+
+```python
+@event.listens_for(User, "before_insert")
+def handler(mapper, connection, target):
+ # β
pure, synchronous work on `target`
+ target.username = target.username.strip().lower()
+ # β await db.session.execute(...) <- not allowed here
+```
+
+If you need async work after a write, do it **after** `await
+db.session.commit()` returns, or use `Session`-level events such as
+`after_flush` / `after_commit` and schedule the async work from there.
+
+## A complete example
+
+A runnable example with validation, automatic timestamps, audit logging, and a
+commented-out soft-delete hook lives in the repository at
+[`examples/events_example.py`](https://github.com/h0rn3t/fastapi-async-sqlalchemy/blob/main/examples/events_example.py).
+It wires the listeners above into a small FastAPI CRUD app:
+
+```python
+@app.post("/users")
+async def create_user(username: str, email: str, full_name: str | None = None):
+ async with db():
+ user = User(username=username, email=email, full_name=full_name)
+ db.session.add(user)
+ await db.session.commit() # before_insert / after_insert fire on flush
+ return {"id": user.id, "username": user.username}
+```
diff --git a/docs/guide/index.md b/docs/guide/index.md
new file mode 100644
index 0000000..a682eab
--- /dev/null
+++ b/docs/guide/index.md
@@ -0,0 +1,87 @@
+# User Guide
+
+The middleware is small but covers several distinct concerns. Pick the topic you
+need β each page is self-contained.
+
+
+
+- :material-database-sync:{ .lg .middle } __Sessions & Contexts__
+
+ ---
+
+ The `db.session` proxy, the `async with db()` context manager,
+ `commit_on_exit`, and when each session is created and closed.
+
+ [:octicons-arrow-right-24: Read](sessions.md)
+
+- :material-engine:{ .lg .middle } __Engine Lifecycle__
+
+ ---
+
+ Who owns the engine (`db_url` vs `custom_engine`), when it is disposed, and
+ how to dispose it manually outside an ASGI lifespan.
+
+ [:octicons-arrow-right-24: Read](engine-lifecycle.md)
+
+- :material-arrow-decision:{ .lg .middle } __Concurrent Queries__
+
+ ---
+
+ `multi_sessions`, `max_concurrent`, `db.gather()` and `db.connection()` β
+ parallel work without blowing past the connection pool.
+
+ [:octicons-arrow-right-24: Read](concurrency.md)
+
+- :material-download-network:{ .lg .middle } __Streaming Responses__
+
+ ---
+
+ Why streaming bodies need their own session and how to write one safely.
+
+ [:octicons-arrow-right-24: Read](streaming.md)
+
+- :material-transit-connection-variant:{ .lg .middle } __Multiple Databases__
+
+ ---
+
+ `create_middleware_and_session_proxy()` for independent apps or databases.
+
+ [:octicons-arrow-right-24: Read](multi-database.md)
+
+- :material-bell-ring:{ .lg .middle } __SQLAlchemy Events__
+
+ ---
+
+ Using `before_insert` / `after_update` and friends with async sessions.
+
+ [:octicons-arrow-right-24: Read](events.md)
+
+- :material-language-python:{ .lg .middle } __Type Hints__
+
+ ---
+
+ Annotate `db` with `DBSessionMeta` for full mypy / IDE support.
+
+ [:octicons-arrow-right-24: Read](type-hints.md)
+
+
+
+## Mental model
+
+```mermaid
+flowchart LR
+ A[HTTP request] --> B[SQLAlchemyMiddleware]
+ B -->|opens| C[AsyncSession bound to ContextVar]
+ C --> D[Route / service code
reads db.session]
+ D --> E{clean exit?}
+ E -->|yes + commit_on_exit| F[commit]
+ E -->|exception| G[rollback]
+ F --> H[close session]
+ G --> H
+ H --> I[response sent]
+```
+
+The session lives for the duration of the request context. Everything in the
+guide is a variation on this: opening extra contexts (`async with db()`),
+running many sessions at once (`multi_sessions`), or moving the lifetime into a
+streaming body.
diff --git a/docs/guide/multi-database.md b/docs/guide/multi-database.md
new file mode 100644
index 0000000..568b1f0
--- /dev/null
+++ b/docs/guide/multi-database.md
@@ -0,0 +1,102 @@
+# Multiple Databases
+
+The default `SQLAlchemyMiddleware` / `db` pair is bound to **one** engine. To
+talk to several independent databases, create a separate middleware/session
+proxy pair for each with `create_middleware_and_session_proxy()`.
+
+## Create one pair per database
+
+```python title="databases.py"
+from fastapi_async_sqlalchemy import create_middleware_and_session_proxy
+
+FirstSQLAlchemyMiddleware, first_db = create_middleware_and_session_proxy()
+SecondSQLAlchemyMiddleware, second_db = create_middleware_and_session_proxy()
+```
+
+Each call returns an independent `(middleware_class, db_proxy)` tuple with its
+own `ContextVar` state and engine binding.
+
+!!! info "The default pair is just a pre-made instance"
+ `SQLAlchemyMiddleware, db = create_middleware_and_session_proxy()` is exactly
+ how the package builds the defaults it exports. Calling it yourself gives you
+ additional, fully isolated pairs.
+
+## Wire them into the app
+
+```python title="main.py"
+from fastapi import FastAPI
+
+from databases import FirstSQLAlchemyMiddleware, SecondSQLAlchemyMiddleware
+from routes import router
+
+app = FastAPI()
+app.include_router(router)
+
+app.add_middleware(
+ FirstSQLAlchemyMiddleware,
+ db_url="postgresql+asyncpg://user:pass@localhost:5432/primary_db",
+ engine_args={"pool_size": 5, "max_overflow": 10},
+)
+app.add_middleware(
+ SecondSQLAlchemyMiddleware,
+ db_url="mysql+aiomysql://user:pass@localhost:3306/secondary_db",
+ engine_args={"pool_size": 5, "max_overflow": 10},
+)
+```
+
+## Use the right proxy in each route
+
+```python title="routes.py"
+from fastapi import APIRouter
+from sqlalchemy import column, table
+
+from databases import first_db, second_db
+
+router = APIRouter()
+files = table("ms_files", column("id"))
+
+
+@router.get("/first-db-files")
+async def get_files_from_first_db():
+ result = await first_db.session.execute(files.select())
+ return result.fetchall()
+
+
+@router.get("/second-db-files")
+async def get_files_from_second_db():
+ result = await second_db.session.execute(files.select())
+ return result.fetchall()
+```
+
+Each proxy resolves its own request-scoped session, so `first_db.session` and
+`second_db.session` never collide.
+
+## One proxy, one live engine
+
+A proxy is **bound to a single live engine**. Reusing the same proxy with a
+different live engine is rejected:
+
+```text
+RuntimeError: This SQLAlchemy session proxy is already bound to another live
+engine. Use create_middleware_and_session_proxy() for independent apps or
+databases.
+```
+
+This guard exists so requests can never silently switch to a different database
+binding. The fix is to use a **fresh pair** per app or database β exactly what
+`create_middleware_and_session_proxy()` is for.
+
+!!! tip "Rebinding after disposal"
+ The binding is cleared when the owning middleware's engine is disposed (via
+ lifespan shutdown or `await middleware.dispose()`). After disposal the proxy
+ is free to bind a new engine β useful in test suites that build and tear down
+ an app per test.
+
+## Concurrency still applies per proxy
+
+Each proxy supports the full [concurrency API](concurrency.md) independently:
+
+```python
+async with first_db(multi_sessions=True, max_concurrent=10):
+ results = await first_db.gather(*(work(i) for i in range(100)))
+```
diff --git a/docs/guide/sessions.md b/docs/guide/sessions.md
new file mode 100644
index 0000000..89fd84a
--- /dev/null
+++ b/docs/guide/sessions.md
@@ -0,0 +1,137 @@
+# Sessions & Contexts
+
+Everything in this library revolves around one idea: **`db.session` is an
+`AsyncSession` bound to the current async context.** This page explains how that
+session is created, where it lives, and how to open your own contexts.
+
+## The `db` proxy
+
+`db` is a global object exported from the package:
+
+```python
+from fastapi_async_sqlalchemy import db
+```
+
+It exposes a small surface:
+
+| Member | Kind | Purpose |
+| ------------------ | --------------------- | ---------------------------------------------------- |
+| `db.session` | property | The `AsyncSession` for the current context |
+| `db(...)` | callable | Open an explicit session context manager |
+| `db.connection()` | method | Throttled session context manager (multi-session) |
+| `db.gather(...)` | coroutine | Pool-aware `asyncio.gather` (multi-session) |
+
+`db.session` is backed by a [`ContextVar`][contextvar], so each request β each
+independent async context β sees its own session. You never pass it around.
+
+## Inside a request
+
+When `SQLAlchemyMiddleware` is installed, every HTTP request gets a session
+opened **before** your route runs and finalized **after** it returns:
+
+```python
+@app.get("/items/{item_id}")
+async def get_item(item_id: int):
+ item = await db.session.get(Item, item_id)
+ return item
+```
+
+The same session is visible from any function called during the request, no
+arguments required:
+
+```python
+async def load_item(item_id: int) -> Item | None:
+ # same session as the route β resolved from the request context
+ return await db.session.get(Item, item_id)
+
+
+@app.get("/items/{item_id}")
+async def get_item(item_id: int):
+ return await load_item(item_id)
+```
+
+### Commit on exit
+
+By default the request session is **not** committed for you β you call
+`await db.session.commit()` yourself. Set `commit_on_exit=True` to commit
+automatically when the request finishes cleanly:
+
+```python
+app.add_middleware(
+ SQLAlchemyMiddleware,
+ db_url="postgresql+asyncpg://user:pass@localhost/app",
+ commit_on_exit=True,
+)
+```
+
+The finalization rules are:
+
+- **Clean exit + `commit_on_exit=True`** β `commit()`, then `close()`.
+- **Clean exit + `commit_on_exit=False`** (default) β just `close()` (uncommitted
+ work is rolled back by closing).
+- **Exception** β `rollback()`, then `close()`. The original exception
+ propagates; a failure during rollback/commit/close is surfaced too.
+
+!!! warning "Commit/rollback errors are not swallowed"
+ If `commit()` fails, the middleware attempts a `rollback()` and raises, so a
+ write failure can never be reported to the client as success.
+
+## Outside a request: `async with db()`
+
+Anywhere there is no request context β startup/shutdown hooks, CLI scripts,
+background tasks, tests β open a session explicitly:
+
+```python
+async def get_db_fetch():
+ async with db():
+ result = await db.session.execute(foo.select())
+ return result.fetchall()
+```
+
+`db()` accepts the same finalization options as the middleware:
+
+```python
+async with db(commit_on_exit=True):
+ db.session.add(User(name="ada"))
+ # committed automatically on a clean exit
+```
+
+You can also pass `session_args` to override sessionmaker arguments for that one
+context:
+
+```python
+async with db(session_args={"expire_on_commit": True}):
+ ...
+```
+
+### `MissingSessionError`
+
+Accessing `db.session` with no active context raises
+[`MissingSessionError`](../api-reference.md#exceptions):
+
+```python
+# β no request, no `async with db()`
+result = await db.session.execute(foo.select()) # MissingSessionError
+```
+
+The fix is always the same β wrap the access in a context:
+
+```python
+async with db():
+ result = await db.session.execute(foo.select())
+```
+
+### `SessionNotInitialisedError`
+
+If you access `db.session` before any `SQLAlchemyMiddleware` has been
+constructed (so the sessionmaker doesn't exist yet), you get
+[`SessionNotInitialisedError`](../api-reference.md#exceptions) instead. Make sure
+`app.add_middleware(SQLAlchemyMiddleware, ...)` runs during app setup.
+
+## Where to go next
+
+- Run **many** sessions at once β [Concurrent Queries](concurrency.md)
+- Stream a large response body β [Streaming Responses](streaming.md)
+- Understand engine ownership and shutdown β [Engine Lifecycle](engine-lifecycle.md)
+
+[contextvar]: https://docs.python.org/3/library/contextvars.html
diff --git a/docs/guide/streaming.md b/docs/guide/streaming.md
new file mode 100644
index 0000000..adffb82
--- /dev/null
+++ b/docs/guide/streaming.md
@@ -0,0 +1,102 @@
+# Streaming Responses
+
+A `StreamingResponse` (or `FileResponse`) has a **different lifetime** from a
+normal request transaction. The body keeps yielding chunks *after* your route
+function returns, but the middleware-managed request session is tied to the
+request transaction β not to the stream. So you must not rely on `db.session`
+staying open while a streaming body runs.
+
+The rule: **open an explicit session inside the generator** so the body owns its
+own database lifetime.
+
+## The right way
+
+```python
+from fastapi.responses import StreamingResponse
+from fastapi_async_sqlalchemy import db
+
+@app.get("/export")
+async def export():
+ async def rows():
+ async with db(): # body-owned session
+ result = await db.session.stream(foo.select())
+ async for row in result:
+ yield f"{row.id}\n".encode()
+
+ return StreamingResponse(rows(), media_type="text/plain")
+```
+
+The `async with db()` inside the generator makes the session lifetime explicit
+and keeps the session open for the whole body.
+
+## Why not `commit_on_exit=True`?
+
+Implicit `commit_on_exit=True` is **not** a safe way to report streaming write
+success. The response may have already started β and early chunks already sent β
+before an unbounded body finishes. A late commit failure cannot un-send those
+chunks.
+
+To enforce this, the middleware actively rejects the unsafe combination. If a
+streaming response begins while `commit_on_exit=True` **and** the request
+session was already used, it raises:
+
+```text
+RuntimeError: `commit_on_exit=True` cannot use the middleware-managed request
+database session with a streaming response. Use `async with db()` inside the
+streaming generator, or manage the streaming transaction explicitly.
+```
+
+Similarly, once the request session has been closed for streaming, touching it
+again raises a `RuntimeError` telling you to use `async with db()` inside the
+generator.
+
+## If a streaming route needs to write
+
+Pick one of two explicit patterns:
+
+=== "Commit before streaming"
+
+ Complete and commit the write in its own context **before** creating the
+ streaming response, then stream read-only:
+
+ ```python
+ @app.post("/report")
+ async def make_report():
+ async with db(commit_on_exit=True):
+ db.session.add(ReportRun(status="started"))
+ # committed here, before any streaming begins
+
+ async def body():
+ async with db():
+ result = await db.session.stream(rows.select())
+ async for row in result:
+ yield serialize(row)
+
+ return StreamingResponse(body())
+ ```
+
+=== "Write inside the generator"
+
+ Make the generator own an explicit write transaction and design the API so
+ clients don't treat early chunks as confirmation of a completed write:
+
+ ```python
+ @app.get("/stream-and-write")
+ async def stream_and_write():
+ async def body():
+ async with db(commit_on_exit=True):
+ async for row in produce():
+ db.session.add(AuditRow(data=row))
+ yield row
+ # committed when the generator's context exits
+
+ return StreamingResponse(body())
+ ```
+
+## Migrating existing code
+
+If you previously used `db.session` directly inside a streaming generator, move
+that code into a generator-owned `async with db()` context as shown above. This
+keeps database access available for the whole body while making it clear that
+the session lifetime belongs to the stream, not the original request
+transaction.
diff --git a/docs/guide/type-hints.md b/docs/guide/type-hints.md
new file mode 100644
index 0000000..5ce1eba
--- /dev/null
+++ b/docs/guide/type-hints.md
@@ -0,0 +1,91 @@
+# Type Hints
+
+The package ships a `py.typed` marker, so type checkers read its inline
+annotations. The one piece worth knowing about is how to annotate the `db`
+proxy itself.
+
+## Annotating `db` with `DBSessionMeta`
+
+Use `DBSessionMeta` (or its alias `DBSessionType`) when you need to type a
+function or attribute that holds the `db` proxy:
+
+```python
+from fastapi_async_sqlalchemy import DBSessionMeta, db
+
+
+def get_db() -> DBSessionMeta:
+ return db
+```
+
+This gives static checkers (mypy, pyright) and your IDE full autocomplete for
+the proxy surface β `session`, `connection()`, `gather()` and the `db(...)`
+call.
+
+## Runtime vs. type-check behavior
+
+`DBSessionMeta` is deliberately two things at once:
+
+- **At runtime** it is the actual metaclass of `db`, so identity and instance
+ checks work as they did in earlier versions:
+
+ ```python
+ from fastapi_async_sqlalchemy import DBSessionMeta, db
+
+ assert isinstance(db, DBSessionMeta)
+ assert type(db) is DBSessionMeta
+ ```
+
+- **At type-check time** it resolves to a structural
+ [`Protocol`](https://docs.python.org/3/library/typing.html#typing.Protocol)
+ describing the public API. That's what powers autocomplete and `mypy`
+ checking when you annotate with it.
+
+The Protocol surface is:
+
+```python
+class DBSessionMeta(Protocol):
+ @property
+ def session(self) -> AsyncSession: ...
+
+ def connection(self) -> AbstractAsyncContextManager[AsyncSession]: ...
+
+ async def gather(
+ self, *coros_or_futures: Any, return_exceptions: bool = ...
+ ) -> list[Any]: ...
+
+ def __call__(
+ self,
+ session_args: dict[str, Any] | None = ...,
+ commit_on_exit: bool = ...,
+ multi_sessions: bool = ...,
+ max_concurrent: int | None = ...,
+ ) -> AbstractAsyncContextManager[Any]: ...
+```
+
+`DBSessionType` is an exported alias of `DBSessionMeta` β use whichever name
+reads better in your codebase.
+
+## Dependency-injection style
+
+If you prefer passing `db` explicitly (e.g. for testability) rather than
+importing the global, the annotation makes it first-class:
+
+```python
+from fastapi import Depends
+from fastapi_async_sqlalchemy import DBSessionMeta, db
+
+
+def get_db() -> DBSessionMeta:
+ return db
+
+
+async def list_users(database: DBSessionMeta = Depends(get_db)):
+ result = await database.session.execute(users.select())
+ return result.fetchall()
+```
+
+## Works with SQLModel
+
+If `sqlmodel` is installed, the middleware uses `sqlmodel`'s `AsyncSession`
+subclass automatically, so `db.session` exposes SQLModel's session API. No extra
+configuration is needed.
diff --git a/docs/index.md b/docs/index.md
new file mode 100644
index 0000000..7fce099
--- /dev/null
+++ b/docs/index.md
@@ -0,0 +1,126 @@
+---
+hide:
+ - navigation
+ - toc
+---
+
+
+
+# FastAPI Async SQLAlchemy
+
+Drop-in async SQLAlchemy middleware for FastAPI. A request-scoped
+`AsyncSession` you reach through a single global `db` β no per-route
+dependency wiring, no manual session plumbing.
+
+
+[Get started :material-arrow-right:](getting-started.md){ .md-button .md-button--primary }
+[View on GitHub](https://github.com/h0rn3t/fastapi-async-sqlalchemy){ .md-button }
+
+
+
+[](https://pypi.org/project/fastapi-async-sqlalchemy/)
+[](https://pepy.tech/project/fastapi-async-sqlalchemy)
+[](https://opensource.org/licenses/MIT)
+[](https://github.com/h0rn3t/fastapi-async-sqlalchemy/actions)
+
+
+
+
+## Why this middleware?
+
+SQLAlchemy's `AsyncSession` is not safe to share across concurrent tasks, and
+FastAPI gives you a fresh request per coroutine. This middleware binds **one
+session to each request context** using a Python [`ContextVar`][contextvar], so
+`db.session` always resolves to the right session for the request you're in β
+whether you access it from a route, a service function, or a background helper.
+
+```python
+from fastapi import FastAPI
+from fastapi_async_sqlalchemy import SQLAlchemyMiddleware, db
+from sqlalchemy import text
+
+app = FastAPI()
+app.add_middleware(
+ SQLAlchemyMiddleware,
+ db_url="postgresql+asyncpg://user:pass@localhost:5432/app",
+)
+
+@app.get("/ping")
+async def ping():
+ result = await db.session.execute(text("SELECT 1"))
+ return {"db": result.scalar()}
+```
+
+No `Depends(get_session)`, no passing the session down every call. Access the
+session anywhere in the request with `db.session`.
+
+## Features
+
+
+
+- :material-database-sync:{ .lg .middle } __Request-scoped sessions__
+
+ ---
+
+ A `ContextVar`-backed `AsyncSession` per request. Reach it from anywhere
+ with `db.session` β no dependency injection boilerplate.
+
+ [:octicons-arrow-right-24: Sessions & contexts](guide/sessions.md)
+
+- :material-engine:{ .lg .middle } __Engine lifecycle done right__
+
+ ---
+
+ Pass a `db_url` and the middleware owns and disposes the engine on
+ shutdown; pass a `custom_engine` and you keep ownership.
+
+ [:octicons-arrow-right-24: Engine lifecycle](guide/engine-lifecycle.md)
+
+- :material-arrow-decision:{ .lg .middle } __Pool-throttled concurrency__
+
+ ---
+
+ Run many queries in parallel without exhausting the pool. `db.gather()`
+ and `db.connection()` cap in-flight sessions at `max_concurrent`.
+
+ [:octicons-arrow-right-24: Concurrent queries](guide/concurrency.md)
+
+- :material-transit-connection-variant:{ .lg .middle } __Multiple databases__
+
+ ---
+
+ Build independent middleware/proxy pairs with
+ `create_middleware_and_session_proxy()` β one per database.
+
+ [:octicons-arrow-right-24: Multiple databases](guide/multi-database.md)
+
+- :material-download-network:{ .lg .middle } __Streaming-aware__
+
+ ---
+
+ Clear rules for `StreamingResponse` so the session lifetime belongs to the
+ body, not a closed request transaction.
+
+ [:octicons-arrow-right-24: Streaming responses](guide/streaming.md)
+
+- :material-language-python:{ .lg .middle } __Typed & SQLModel-ready__
+
+ ---
+
+ Ships `py.typed`, a `DBSessionMeta` Protocol for full autocomplete, and
+ works transparently with `sqlmodel`.
+
+ [:octicons-arrow-right-24: Type hints](guide/type-hints.md)
+
+
+
+## Installation
+
+```bash
+pip install fastapi-async-sqlalchemy
+```
+
+Requires Python 3.12+, `starlette>=0.40`, and `SQLAlchemy>=2.0`. Add the async
+driver for your database (`asyncpg`, `aiomysql`, `aiosqlite`, β¦).
+
+[contextvar]: https://docs.python.org/3/library/contextvars.html
diff --git a/mkdocs.yml b/mkdocs.yml
new file mode 100644
index 0000000..accc708
--- /dev/null
+++ b/mkdocs.yml
@@ -0,0 +1,124 @@
+site_name: FastAPI Async SQLAlchemy
+site_description: >-
+ Async SQLAlchemy middleware for FastAPI β a request-scoped AsyncSession
+ proxy with multi-database, pool-throttled concurrency, streaming and engine
+ lifecycle management.
+site_author: Eugene Shershen
+site_url: https://h0rn3t.github.io/fastapi-async-sqlalchemy/
+
+repo_name: h0rn3t/fastapi-async-sqlalchemy
+repo_url: https://github.com/h0rn3t/fastapi-async-sqlalchemy
+edit_uri: edit/main/docs/
+
+copyright: Copyright © Eugene Shershen β MIT License
+
+theme:
+ name: material
+ language: en
+ icon:
+ logo: material/database-sync
+ repo: fontawesome/brands/github
+ favicon: assets/favicon.svg
+ palette:
+ - media: "(prefers-color-scheme)"
+ toggle:
+ icon: material/brightness-auto
+ name: Switch to light mode
+ - media: "(prefers-color-scheme: light)"
+ scheme: default
+ primary: teal
+ accent: deep orange
+ toggle:
+ icon: material/weather-sunny
+ name: Switch to dark mode
+ - media: "(prefers-color-scheme: dark)"
+ scheme: slate
+ primary: teal
+ accent: amber
+ toggle:
+ icon: material/weather-night
+ name: Switch to system preference
+ font:
+ text: Inter
+ code: JetBrains Mono
+ features:
+ - navigation.tabs
+ - navigation.tabs.sticky
+ - navigation.sections
+ - navigation.top
+ - navigation.tracking
+ - navigation.indexes
+ - navigation.footer
+ - toc.follow
+ - search.suggest
+ - search.highlight
+ - search.share
+ - content.code.copy
+ - content.code.annotate
+ - content.tabs.link
+ - content.tooltips
+
+extra:
+ social:
+ - icon: fontawesome/brands/github
+ link: https://github.com/h0rn3t/fastapi-async-sqlalchemy
+ - icon: fontawesome/brands/python
+ link: https://pypi.org/project/fastapi-async-sqlalchemy/
+ generator: false
+
+extra_css:
+ - assets/extra.css
+
+plugins:
+ - search
+
+markdown_extensions:
+ - abbr
+ - admonition
+ - attr_list
+ - def_list
+ - footnotes
+ - md_in_html
+ - tables
+ - toc:
+ permalink: true
+ title: On this page
+ - pymdownx.betterem
+ - pymdownx.caret
+ - pymdownx.mark
+ - pymdownx.tilde
+ - pymdownx.keys
+ - pymdownx.details
+ - pymdownx.inlinehilite
+ - pymdownx.smartsymbols
+ - pymdownx.highlight:
+ anchor_linenums: true
+ line_spans: __span
+ pygments_lang_class: true
+ - pymdownx.superfences:
+ custom_fences:
+ - name: mermaid
+ class: mermaid
+ format: !!python/name:pymdownx.superfences.fence_code_format
+ - pymdownx.tabbed:
+ alternate_style: true
+ - pymdownx.tasklist:
+ custom_checkbox: true
+ - pymdownx.emoji:
+ emoji_index: !!python/name:material.extensions.emoji.twemoji
+ emoji_generator: !!python/name:material.extensions.emoji.to_svg
+
+nav:
+ - Home: index.md
+ - Getting Started: getting-started.md
+ - User Guide:
+ - guide/index.md
+ - Sessions & Contexts: guide/sessions.md
+ - Engine Lifecycle: guide/engine-lifecycle.md
+ - Concurrent Queries: guide/concurrency.md
+ - Streaming Responses: guide/streaming.md
+ - Multiple Databases: guide/multi-database.md
+ - SQLAlchemy Events: guide/events.md
+ - Type Hints: guide/type-hints.md
+ - API Reference: api-reference.md
+ - FAQ & Troubleshooting: faq.md
diff --git a/requirements-docs.txt b/requirements-docs.txt
new file mode 100644
index 0000000..19058a1
--- /dev/null
+++ b/requirements-docs.txt
@@ -0,0 +1,2 @@
+# Documentation toolchain β install with: pip install -r requirements-docs.txt
+mkdocs-material>=9.5,<10
diff --git a/requirements.txt b/requirements.txt
index e514fb1..0c86149 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -8,7 +8,7 @@ coverage>=5.2.1
entrypoints==0.3
fastapi>=0.115
flake8==3.7.9
-idna==3.7
+idna>=3.15
importlib-metadata==1.5.0
isort==5.13.2
mccabe==0.6.1
diff --git a/tests/test_concurrent_queries_postgres.py b/tests/test_concurrent_queries_postgres.py
new file mode 100644
index 0000000..b4c9ed3
--- /dev/null
+++ b/tests/test_concurrent_queries_postgres.py
@@ -0,0 +1,111 @@
+"""Real PostgreSQL/asyncpg reproduction of the `isce` concurrent-operations error.
+
+The production traceback originates here::
+
+ sqlalchemy.exc.InvalidRequestError: This session is provisioning a new
+ connection; concurrent operations are not permitted
+ (https://sqlalche.me/e/20/isce)
+
+The error is triggered by ``await asyncio.gather(session.execute(...),
+session.execute(...))`` against a single AsyncSession backed by asyncpg.
+SQLite/aiosqlite serialises internally and does NOT reliably reproduce it
+(see ``test_concurrent_queries.py``), so this module targets a real Postgres
+instance and is skipped unless ``POSTGRES_TEST_URL`` is set.
+
+Run locally::
+
+ POSTGRES_TEST_URL="postgresql+asyncpg://user:pass@localhost:5432/test" \\
+ pytest tests/test_concurrent_queries_postgres.py -v
+"""
+
+import asyncio
+import os
+import uuid
+
+import pytest
+from sqlalchemy import text
+
+POSTGRES_URL = os.getenv("POSTGRES_TEST_URL")
+
+pytestmark = pytest.mark.skipif(
+ not POSTGRES_URL,
+ reason="POSTGRES_TEST_URL not set; this test requires a real PostgreSQL/asyncpg instance",
+)
+
+
+@pytest.fixture
+def table_name():
+ return f"isce_repro_{uuid.uuid4().hex[:8]}"
+
+
+@pytest.fixture
+async def setup_table(app, db, SQLAlchemyMiddleware, table_name):
+ SQLAlchemyMiddleware(app, db_url=POSTGRES_URL)
+ async with db(commit_on_exit=True):
+ await db.session.execute(
+ text(f"CREATE TABLE {table_name} (id SERIAL PRIMARY KEY, name TEXT NOT NULL)")
+ )
+ await db.session.execute(
+ text(
+ f"INSERT INTO {table_name} (name) "
+ "SELECT 'row_' || g FROM generate_series(1, 50) g"
+ )
+ )
+ try:
+ yield
+ finally:
+ async with db(commit_on_exit=True):
+ await db.session.execute(text(f"DROP TABLE IF EXISTS {table_name}"))
+
+
+@pytest.mark.asyncio
+async def test_gather_on_same_session_raises_isce(db, table_name, setup_table):
+ """asyncio.gather() on a single AsyncSession must raise isce on asyncpg."""
+ count_stmt = text(f"SELECT COUNT(*) FROM {table_name}")
+ rows_stmt = text(f"SELECT id, name FROM {table_name} LIMIT 10")
+
+ with pytest.raises(Exception) as exc_info:
+ async with db():
+ await asyncio.gather(
+ db.session.execute(count_stmt),
+ db.session.execute(rows_stmt),
+ )
+
+ error_msg = str(exc_info.value).lower()
+ assert (
+ "concurrent operations are not permitted" in error_msg
+ or "provisioning a new connection" in error_msg
+ or "isce" in error_msg
+ ), f"Expected isce error, got: {exc_info.value!r}"
+
+
+@pytest.mark.asyncio
+async def test_db_gather_multi_sessions_avoids_isce(db, table_name, setup_table):
+ """db.gather() in multi_sessions mode gives each task its own session β no isce."""
+ count_stmt = text(f"SELECT COUNT(*) FROM {table_name}")
+ rows_stmt = text(f"SELECT id, name FROM {table_name} LIMIT 10")
+
+ async def get_count():
+ result = await db.session.execute(count_stmt)
+ return result.scalar()
+
+ async def get_rows():
+ result = await db.session.execute(rows_stmt)
+ return result.fetchall()
+
+ async with db(multi_sessions=True, max_concurrent=2):
+ count, rows = await db.gather(get_count(), get_rows())
+
+ assert count == 50
+ assert len(rows) == 10
+
+
+@pytest.mark.asyncio
+async def test_sequential_execute_on_same_session_works(db, table_name, setup_table):
+ """Sequential awaits on the same session never trigger isce."""
+ async with db():
+ count_result = await db.session.execute(text(f"SELECT COUNT(*) FROM {table_name}"))
+ rows_result = await db.session.execute(text(f"SELECT id FROM {table_name} LIMIT 5"))
+
+ assert count_result.scalar() == 50
+ assert len(rows_result.fetchall()) == 5