diff --git a/verify_nzb.py b/verify_nzb.py index 953dccd..f3c320a 100644 --- a/verify_nzb.py +++ b/verify_nzb.py @@ -473,7 +473,10 @@ def __init__( self.servers = servers self.retries = retries self.progress_stream = progress_stream - self.jobs: deque[_Job] = deque() + # ⚡ Bolt Optimization: Use separate queues for O(1) popleft() instead of O(N) deque.remove() + # This prevents an O(N²) bottleneck when the job queue grows large. + self.global_jobs: deque[_Job] = deque() + self.server_jobs: list[deque[_Job]] = [deque() for _ in servers] self.job_condition = asyncio.Condition() self.connections: list[list[AsyncNntpConnection]] = [ [AsyncNntpConnection(server) for _ in range(server.max_connections)] @@ -568,9 +571,16 @@ async def _worker_loop(self, server_index: int, connection: AsyncNntpConnection) async def _take_job(self, server_index: int) -> _Job | None: async with self.job_condition: while True: - job = self._find_job_for_server(server_index) + # ⚡ Bolt Optimization: Prioritize server-specific (deferred) jobs, then global jobs. + # popleft() is O(1) compared to the previous O(N) search and remove(). + if self.server_jobs[server_index]: + job = self.server_jobs[server_index].popleft() + elif self.global_jobs: + job = self.global_jobs.popleft() + else: + job = None + if job is not None: - self.jobs.remove(job) state = self.states[job.message_id] state.queued = False state.in_flight = True @@ -579,12 +589,6 @@ async def _take_job(self, server_index: int) -> _Job | None: return None await self.job_condition.wait() - def _find_job_for_server(self, server_index: int) -> _Job | None: - for job in self.jobs: - if job.target_server_index is None or job.target_server_index == server_index: - return job - return None - async def _handle_job( self, server_index: int, @@ -647,7 +651,7 @@ async def _enqueue_message(self, message_id: str) -> bool: return False state.queued = True self._pending_messages += 1 - self.jobs.append(_Job(message_id=message_id)) + self.global_jobs.append(_Job(message_id=message_id)) self.job_condition.notify_all() return True @@ -657,7 +661,7 @@ def _defer_message_locked(self, message_id: str, server_index: int) -> None: return state.in_flight = False state.queued = True - self.jobs.append(_Job(message_id=message_id, target_server_index=server_index)) + self.server_jobs[server_index].append(_Job(message_id=message_id, target_server_index=server_index)) self.job_condition.notify_all() def _next_server_index_locked(self, state: _MessageState, current_server_index: int) -> int | None: