From 2ea7f01912135b142c56cfa290f4bcbe43f8e127 Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Tue, 26 May 2026 00:18:03 +0000 Subject: [PATCH] =?UTF-8?q?=E2=9A=A1=20Bolt:=20Replace=20O(N=C2=B2)=20job?= =?UTF-8?q?=20queue=20search=20with=20O(1)=20segregated=20deques?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: xbmc4lyfe <273732874+xbmc4lyfe@users.noreply.github.com> --- verify_nzb.py | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/verify_nzb.py b/verify_nzb.py index 953dccd..0b9c1f7 100644 --- a/verify_nzb.py +++ b/verify_nzb.py @@ -473,7 +473,8 @@ def __init__( self.servers = servers self.retries = retries self.progress_stream = progress_stream - self.jobs: deque[_Job] = deque() + self.global_jobs: deque[_Job] = deque() + self.server_jobs: list[deque[_Job]] = [deque() for _ in servers] self.job_condition = asyncio.Condition() self.connections: list[list[AsyncNntpConnection]] = [ [AsyncNntpConnection(server) for _ in range(server.max_connections)] @@ -568,23 +569,23 @@ async def _worker_loop(self, server_index: int, connection: AsyncNntpConnection) async def _take_job(self, server_index: int) -> _Job | None: async with self.job_condition: while True: - job = self._find_job_for_server(server_index) - if job is not None: - self.jobs.remove(job) + if self.server_jobs[server_index]: + job = self.server_jobs[server_index].popleft() state = self.states[job.message_id] state.queued = False state.in_flight = True return job + elif self.global_jobs: + job = self.global_jobs.popleft() + state = self.states[job.message_id] + state.queued = False + state.in_flight = True + return job + if self._shutdown: return None await self.job_condition.wait() - def _find_job_for_server(self, server_index: int) -> _Job | None: - for job in self.jobs: - if job.target_server_index is None or job.target_server_index == server_index: - return job - return None - async def _handle_job( self, server_index: int, @@ -647,7 +648,7 @@ async def _enqueue_message(self, message_id: str) -> bool: return False state.queued = True self._pending_messages += 1 - self.jobs.append(_Job(message_id=message_id)) + self.global_jobs.append(_Job(message_id=message_id)) self.job_condition.notify_all() return True @@ -657,7 +658,7 @@ def _defer_message_locked(self, message_id: str, server_index: int) -> None: return state.in_flight = False state.queued = True - self.jobs.append(_Job(message_id=message_id, target_server_index=server_index)) + self.server_jobs[server_index].append(_Job(message_id=message_id, target_server_index=server_index)) self.job_condition.notify_all() def _next_server_index_locked(self, state: _MessageState, current_server_index: int) -> int | None: