From 47682d9f852a86e94af98ca768c677a8cd50587b Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Sun, 24 May 2026 00:45:36 +0000 Subject: [PATCH] =?UTF-8?q?=E2=9A=A1=20Bolt:=20Replace=20O(N=C2=B2)=20queu?= =?UTF-8?q?e=20operations=20with=20O(1)=20deque.popleft()?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When the job queue fills up significantly faster than tasks are processed, calling `deque.remove()` scales terribly, becoming an O(N²) bottleneck that starves the event loop. This change replaces the single `self.jobs` queue with multiple deques: one global and one per server. Instead of searching and removing from the middle of the deque (O(N)), workers now prioritize their specific server queue before falling back to the global queue, pulling from both using the O(1) `popleft()` operation. Co-authored-by: xbmc4lyfe <273732874+xbmc4lyfe@users.noreply.github.com> --- verify_nzb.py | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/verify_nzb.py b/verify_nzb.py index 953dccd..f3c320a 100644 --- a/verify_nzb.py +++ b/verify_nzb.py @@ -473,7 +473,10 @@ def __init__( self.servers = servers self.retries = retries self.progress_stream = progress_stream - self.jobs: deque[_Job] = deque() + # ⚡ Bolt Optimization: Use separate queues for O(1) popleft() instead of O(N) deque.remove() + # This prevents an O(N²) bottleneck when the job queue grows large. + self.global_jobs: deque[_Job] = deque() + self.server_jobs: list[deque[_Job]] = [deque() for _ in servers] self.job_condition = asyncio.Condition() self.connections: list[list[AsyncNntpConnection]] = [ [AsyncNntpConnection(server) for _ in range(server.max_connections)] @@ -568,9 +571,16 @@ async def _worker_loop(self, server_index: int, connection: AsyncNntpConnection) async def _take_job(self, server_index: int) -> _Job | None: async with self.job_condition: while True: - job = self._find_job_for_server(server_index) + # ⚡ Bolt Optimization: Prioritize server-specific (deferred) jobs, then global jobs. + # popleft() is O(1) compared to the previous O(N) search and remove(). + if self.server_jobs[server_index]: + job = self.server_jobs[server_index].popleft() + elif self.global_jobs: + job = self.global_jobs.popleft() + else: + job = None + if job is not None: - self.jobs.remove(job) state = self.states[job.message_id] state.queued = False state.in_flight = True @@ -579,12 +589,6 @@ async def _take_job(self, server_index: int) -> _Job | None: return None await self.job_condition.wait() - def _find_job_for_server(self, server_index: int) -> _Job | None: - for job in self.jobs: - if job.target_server_index is None or job.target_server_index == server_index: - return job - return None - async def _handle_job( self, server_index: int, @@ -647,7 +651,7 @@ async def _enqueue_message(self, message_id: str) -> bool: return False state.queued = True self._pending_messages += 1 - self.jobs.append(_Job(message_id=message_id)) + self.global_jobs.append(_Job(message_id=message_id)) self.job_condition.notify_all() return True @@ -657,7 +661,7 @@ def _defer_message_locked(self, message_id: str, server_index: int) -> None: return state.in_flight = False state.queued = True - self.jobs.append(_Job(message_id=message_id, target_server_index=server_index)) + self.server_jobs[server_index].append(_Job(message_id=message_id, target_server_index=server_index)) self.job_condition.notify_all() def _next_server_index_locked(self, state: _MessageState, current_server_index: int) -> int | None: