Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 13 additions & 12 deletions verify_nzb.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,8 @@ def __init__(
self.servers = servers
self.retries = retries
self.progress_stream = progress_stream
self.jobs: deque[_Job] = deque()
self.global_jobs: deque[_Job] = deque()
self.server_jobs: list[deque[_Job]] = [deque() for _ in servers]
self.job_condition = asyncio.Condition()
self.connections: list[list[AsyncNntpConnection]] = [
[AsyncNntpConnection(server) for _ in range(server.max_connections)]
Expand Down Expand Up @@ -568,23 +569,23 @@ async def _worker_loop(self, server_index: int, connection: AsyncNntpConnection)
async def _take_job(self, server_index: int) -> _Job | None:
async with self.job_condition:
while True:
job = self._find_job_for_server(server_index)
if job is not None:
self.jobs.remove(job)
if self.server_jobs[server_index]:
job = self.server_jobs[server_index].popleft()
state = self.states[job.message_id]
state.queued = False
state.in_flight = True
return job
elif self.global_jobs:
job = self.global_jobs.popleft()
state = self.states[job.message_id]
state.queued = False
state.in_flight = True
return job

if self._shutdown:
return None
await self.job_condition.wait()

def _find_job_for_server(self, server_index: int) -> _Job | None:
for job in self.jobs:
if job.target_server_index is None or job.target_server_index == server_index:
return job
return None

async def _handle_job(
self,
server_index: int,
Expand Down Expand Up @@ -647,7 +648,7 @@ async def _enqueue_message(self, message_id: str) -> bool:
return False
state.queued = True
self._pending_messages += 1
self.jobs.append(_Job(message_id=message_id))
self.global_jobs.append(_Job(message_id=message_id))
self.job_condition.notify_all()
return True

Expand All @@ -657,7 +658,7 @@ def _defer_message_locked(self, message_id: str, server_index: int) -> None:
return
state.in_flight = False
state.queued = True
self.jobs.append(_Job(message_id=message_id, target_server_index=server_index))
self.server_jobs[server_index].append(_Job(message_id=message_id, target_server_index=server_index))
self.job_condition.notify_all()

def _next_server_index_locked(self, state: _MessageState, current_server_index: int) -> int | None:
Expand Down