Skip to content
This repository was archived by the owner on Aug 19, 2025. It is now read-only.
This repository was archived by the owner on Aug 19, 2025. It is now read-only.

Postgres: Cannot perform operation: another operation is in progress #22

@kellen

Description

@kellen

Using broadcaster w/ fastapi and seeing an exception when using broadcaster via websockets.

Relevant parts:

    await self._conn.execute("SELECT pg_notify($1, $2);", channel, message)
    # ...
    'cannot perform operation: another operation is in progress')

Full trace:

INFO:     ('127.0.0.1', 52654) - "WebSocket /viewers" [accepted]
ERROR:    Exception in ASGI application
Traceback (most recent call last):
  File ".../site-packages/uvicorn/protocols/websockets/websockets_impl.py", line 154, in run_asgi
    result = await self.app(self.scope, self.asgi_receive, self.asgi_send)
  File ".../site-packages/uvicorn/middleware/proxy_headers.py", line 45, in __call__
    return await self.app(scope, receive, send)
  File ".../site-packages/fastapi/applications.py", line 179, in __call__
    await super().__call__(scope, receive, send)
  File ".../site-packages/starlette/applications.py", line 111, in __call__
    await self.middleware_stack(scope, receive, send)
  File ".../site-packages/starlette/middleware/errors.py", line 146, in __call__
    await self.app(scope, receive, send)
  File ".../site-packages/starlette/exceptions.py", line 58, in __call__
    await self.app(scope, receive, send)
  File ".../site-packages/starlette/routing.py", line 566, in __call__
    await route.handle(scope, receive, send)
  File ".../site-packages/starlette/routing.py", line 283, in handle
    await self.app(scope, receive, send)
  File ".../site-packages/starlette/routing.py", line 57, in app
    await func(session)
  File ".../site-packages/fastapi/routing.py", line 228, in app
    await dependant.call(**values)
  File "./main.py", line 198, in events_ws
    (viewers_ws_sender, {"websocket": websocket}),
  File ".../site-packages/starlette/concurrency.py", line 18, in run_until_first_complete
    [task.result() for task in done]
  File ".../site-packages/starlette/concurrency.py", line 18, in <listcomp>
    [task.result() for task in done]
  File "./main.py", line 204, in viewers_ws_receiver
    await broadcast.publish(channel="viewers", message=message)
  File ".../site-packages/broadcaster/_base.py", line 72, in publish
    await self._backend.publish(channel, message)
  File ".../site-packages/broadcaster/_backends/postgres.py", line 25, in publish
    await self._conn.execute("SELECT pg_notify($1, $2);", channel, message)
  File ".../site-packages/asyncpg/connection.py", line 297, in execute
    _, status, _ = await self._execute(query, args, 0, timeout, True)
  File ".../site-packages/asyncpg/connection.py", line 1444, in _execute
    with self._stmt_exclusive_section:
  File ".../site-packages/asyncpg/connection.py", line 1891, in __enter__
    'cannot perform operation: another operation is in progress')
asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress
INFO:     ('127.0.0.1', 52655) - "WebSocket /events" [accepted]

Doing this via code that looks something like this, where I think the lock is just slowing things down and exposing the contention.

@app.websocket("/viewers", name="viewers_ws")
async def events_ws(websocket: WebSocket):
    await websocket.accept()
    await run_until_first_complete(
        (viewers_ws_receiver, {"websocket": websocket}),
        (viewers_ws_sender, {"websocket": websocket}),
    )

my_lock = threading.Lock()

async def viewers_ws_receiver(websocket: WebSocket):
    async for message in websocket.iter_text():
        await broadcast.publish(channel="viewers", message=message)

async def viewers_ws_sender(websocket: WebSocket):
    async with broadcast.subscribe(channel="viewers") as subscriber:
        async for event in subscriber:
            counter = 0
            with my_lock:
                # do something with event.message
                counter = ...
            await websocket.send_json({"viewers": counter})

Update: Refactored down to a single websocket and not using a lock for anything and saw this exception again. 🤷

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions