Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ This project adheres to [Semantic Versioning](https://semver.org/).

## [UNRELEASED]

### Added
- [#3826](https://github.com/plotly/dash/pull/3826) WebSocket callback dispatch no longer lets long-lived callbacks limit the number of concurrent users. Async callbacks (including session-persistent ones) run directly on the connection event loop instead of occupying a worker thread, and synchronous callbacks run on a shared `ThreadPoolExecutor` whose size is configurable via the new `websocket_max_workers` argument to `Dash` (default `4`). A synchronous persistent (no-output) callback now warns at registration since it would tie up a worker thread.

## Fixed
- [#3822](https://github.com/plotly/dash/pull/3822) Fix `UnboundLocalError` for `user_callback_output` in async background callbacks (Celery and Diskcache managers) when the callback raises `PreventUpdate` or another exception before the variable is assigned.
- [#3819](https://github.com/plotly/dash/pull/3819) Fix `RuntimeError: No active request in context` when a non-Dash path falls through to the FastAPI catch-all route. Fixes [#3812](https://github.com/plotly/dash/issues/3812).
Expand Down
14 changes: 14 additions & 0 deletions dash/_callback.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import collections
import hashlib
import inspect
import warnings
from functools import wraps
from typing import Callable, Optional, Any, List, Tuple, Union, Dict, TypeVar, cast

Expand Down Expand Up @@ -880,6 +881,19 @@ async def async_add_context(*args, **kwargs):
if inspect.iscoroutinefunction(func):
callback_map[callback_id]["callback"] = async_add_context
else:
# A persistent, no-output callback streams via set_props and typically
# runs for the life of the connection. When synchronous it occupies a
# WebSocket worker thread the whole time and can exhaust the pool, so
# warn that it should be async (async callbacks run on the event loop).
if _kwargs.get("persistent") and not has_output:
warnings.warn(
f"persistent=True callback '{callback_id}' is synchronous and "
"has no Output; it will occupy a WebSocket worker thread for the "
"life of the connection and can exhaust the pool. Define it with "
"'async def' so it runs on the event loop instead.",
RuntimeWarning,
stacklevel=2,
)
callback_map[callback_id]["callback"] = add_context

return func
Expand Down
20 changes: 6 additions & 14 deletions dash/_callback_context.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import asyncio
import functools
import warnings
import json
Expand Down Expand Up @@ -367,20 +366,13 @@ def set_props(component_id: typing.Union[str, dict], props: dict):
"""
ws = _get_from_context("dash_websocket", None)
if ws is not None:
# Stream immediately via WebSocket
# Stream immediately via WebSocket. Queuing is synchronous and thread-safe
# (janus sync side), so we queue directly instead of scheduling a task. This
# avoids detached/orphaned tasks when the callback runs on the event loop and
# preserves ordering relative to the callback response.
_id = stringify_id(component_id)

async def _send_props():
for prop_name, value in props.items():
await ws.set_prop(_id, prop_name, value)

# If we're in an async context, schedule the coroutine
try:
asyncio.get_running_loop()
asyncio.ensure_future(_send_props())
except RuntimeError:
# No running event loop - run synchronously
asyncio.run(_send_props())
for prop_name, value in props.items():
ws.set_prop_sync(_id, prop_name, value)
else:
# Batch for response (existing behavior)
callback_context.set_props(component_id, props)
128 changes: 76 additions & 52 deletions dash/backends/_fastapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import concurrent.futures
import json
import queue
from typing import TYPE_CHECKING, Any, Callable, Dict
from typing import TYPE_CHECKING, Any, Callable, Dict, List
import sys
import mimetypes
import hashlib
Expand Down Expand Up @@ -46,9 +46,9 @@
DashWebsocketCallback,
run_ws_sender,
run_callback_in_executor,
run_callback_on_loop,
make_callback_done_handler,
SHUTDOWN_SIGNAL,
DISCONNECTED,
shutdown_ws_connection,
)
from ._utils import format_traceback_html

Expand Down Expand Up @@ -263,8 +263,8 @@ def __init__(self, server: FastAPI):
self.error_handling_mode = "ignore"
self.request_adapter = FastAPIRequestAdapter
self.response_adapter = FastAPIResponseAdapter
self._before_request_funcs = []
self._after_request_func = None
self._before_request_funcs: List[Callable[[], Any]] = []
self._after_request_func: Callable[[], Any] | None = None
self._enable_timing = False

def __call__(self, *args: Any, **kwargs: Any):
Expand Down Expand Up @@ -685,7 +685,7 @@ def serve_websocket_callback(self, dash_app: "Dash"):
Args:
dash_app: The Dash application instance
"""
# pylint: disable=too-many-statements,too-many-locals
# pylint: disable=too-many-statements,too-many-locals,too-many-branches
ws_path = dash_app.config.routes_pathname_prefix + "_dash-ws-callback"

# Get allowed origins from dash app config
Expand Down Expand Up @@ -729,16 +729,28 @@ async def websocket_handler(websocket: WebSocket):

await websocket.accept()

# The connection's event loop, used to dispatch async callbacks as
# tasks and to resolve get_props futures.
loop = asyncio.get_running_loop()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think of enabling debug mode for the loop here (and in a similar place for Quart)? You could use the existing debug config variable to turn this on. This way, users would get warning when a callback is executing slowly. You could also add a way for users to configure loop.slow_callback_duration.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, maybe as a follow up.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option (maybe in tandem) would be to add a watchdog thread outside of the pool that could keep an eye out for bottlenecked callbacks.


# Create janus queue for outbound messages (main loop context)
outbound_queue: janus.Queue[str] = janus.Queue()
# Track pending get_props requests with standard queue.Queue for responses
pending_get_props: Dict[str, queue.Queue] = {}
# Track pending get_props requests. Values are queue.Queue (threadpool /
# sync path) or asyncio.Future (event-loop / async path).
pending_get_props: Dict[str, queue.Queue | asyncio.Future] = {}
# Shutdown event to signal connection closure to worker threads
shutdown_event = threading.Event()
# Get thread pool executor
executor = self.get_callback_executor()
# Track pending callback futures
pending_callbacks: Dict[str, concurrent.futures.Future] = {}
# Sync callbacks run on a shared thread pool executor (async callbacks
# run directly on the event loop). A single bounded pool caps the total
# worker-thread count regardless of how many connections are open.
executor = self.get_callback_executor(
getattr(dash_app, "_websocket_max_workers", 4)
)
# Track pending callbacks: concurrent.futures.Future (sync/threadpool)
# or asyncio.Task (async/event-loop).
pending_callbacks: Dict[
str, concurrent.futures.Future | asyncio.Future
] = {}

# Start sender task to drain outbound queue (sends pre-serialized text)
# pylint: disable=protected-access
Expand Down Expand Up @@ -777,66 +789,78 @@ async def websocket_handler(websocket: WebSocket):
dash_app._websocket_callbacks,
)

# Create WebSocket callback instance
# Async callbacks (incl. persistent ones) run as tasks on
# the event loop; sync callbacks go to the threadpool.
# pylint: disable=protected-access

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can still remove these stale pylint comments.

cb_spec = dash_app.callback_map.get(payload.get("output"), {})
is_async = inspect.iscoroutinefunction(cb_spec.get("callback"))

# Create WebSocket callback instance. The loop is passed only
# for the async path so get_prop awaits instead of blocking.
ws_cb = DashWebsocketCallback(
pending_get_props,
renderer_id,
outbound_queue,
shutdown_event,
loop if is_async else None,
)

# Submit callback to executor
future = run_callback_in_executor(
executor,
dash_app,
payload,
ws_cb,
FastAPIResponseAdapter(),
done_handler = make_callback_done_handler(
outbound_queue,
pending_callbacks,
request_id,
renderer_id,
shutdown_event,
)

# Set up done callback to send response
future.add_done_callback(
make_callback_done_handler(
outbound_queue,
pending_callbacks,
request_id,
renderer_id,
shutdown_event,
if is_async:
task = asyncio.create_task(
run_callback_on_loop(
dash_app,
payload,
ws_cb,
FastAPIResponseAdapter(),
)
)
)
pending_callbacks[request_id] = future
task.add_done_callback(done_handler)
pending_callbacks[request_id] = task
else:
# Submit callback to executor
future = run_callback_in_executor(
executor,
dash_app,
payload,
ws_cb,
FastAPIResponseAdapter(),
)
# Set up done callback to send response
future.add_done_callback(done_handler)
pending_callbacks[request_id] = future

elif msg_type == "get_props_response":
# Put response in waiting queue (non-blocking)
# Resolve the waiting future (async path) or queue (thread
# path) for this request (non-blocking).
request_id = message.get("requestId")
response_queue = pending_get_props.get(request_id)
if response_queue is not None:
response_queue.put_nowait(message.get("payload"))
pending = pending_get_props.get(request_id)
if isinstance(pending, asyncio.Future):
if not pending.done():
pending.set_result(message.get("payload"))
elif pending is not None:
pending.put_nowait(message.get("payload"))

elif msg_type == "heartbeat":
outbound_queue.sync_q.put_nowait('{"type": "heartbeat_ack"}')

except WebSocketDisconnect:
pass # Clean disconnect
finally:
# Signal shutdown to worker threads
shutdown_event.set()
# Unblock any threads waiting on get_prop responses
for response_queue in pending_get_props.values():
response_queue.put_nowait(DISCONNECTED)
# Signal sender to shutdown and cancel it
outbound_queue.sync_q.put_nowait(SHUTDOWN_SIGNAL)
sender_task.cancel()
try:
await sender_task
except asyncio.CancelledError:
pass
# Close the janus queue
outbound_queue.close()
await outbound_queue.wait_closed()
# Cancel any pending futures
for f in pending_callbacks.values():
f.cancel()
await shutdown_ws_connection(
shutdown_event,
pending_get_props,
pending_callbacks,
outbound_queue,
sender_task,
)

self.server.add_api_websocket_route(ws_path, websocket_handler)

Expand Down
Loading
Loading