We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
There was an error while loading. Please reload this page.
1 parent 8b4bbec commit aa144e6Copy full SHA for aa144e6
1 file changed
taskiq/receiver/receiver.py
@@ -436,9 +436,14 @@ async def prefetcher( # noqa: C901
436
break
437
except Exception:
438
logger.exception("Error while prefetching.")
439
- current_message = None
440
- iterator = self.broker.listen()
441
- self.sem_prefetch.release()
+ # current_message set => fetch failed before enqueue, so we
+ # still own the permit and a (possibly broken) iterator.
+ # Otherwise it's queued and the runner owns the permit;
442
+ # releasing here would leak a prefetch slot.
443
+ if current_message is not None:
444
+ current_message = None
445
+ iterator = self.broker.listen()
446
+ self.sem_prefetch.release()
447
continue
448
finally:
449
# We don't want to fetch new messages if we are shutting down.
0 commit comments