Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions miles/router/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,12 @@ def _candidate_set(self) -> set[str]:
return self.enabled_workers - self.dead_workers
return set(self.worker_request_counts) - self.dead_workers

def _pick_least_loaded(self, candidates: set[str]) -> str:
"""Select the least-loaded candidate and increment its in-flight count."""
url = min(candidates, key=lambda u: self.worker_request_counts.get(u, 0))
self.worker_request_counts[url] += 1
return url

def _use_url(self):
"""Synchronous, raise-on-empty selector.

Expand All @@ -553,9 +559,7 @@ def _use_url(self):
candidates = self._candidate_set()
if not candidates:
raise RuntimeError("No enabled live workers available in the pool")
url = min(candidates, key=lambda u: self.worker_request_counts.get(u, 0))
self.worker_request_counts[url] += 1
return url
return self._pick_least_loaded(candidates)

async def _use_url_async(self):
"""C20 0-active suspend selector (production dispatch path).
Expand All @@ -574,9 +578,7 @@ async def _use_url_async(self):
async with self._workers_changed:
await self._workers_changed.wait_for(lambda: bool(self._candidate_set()))
candidates = self._candidate_set()
url = min(candidates, key=lambda u: self.worker_request_counts.get(u, 0))
self.worker_request_counts[url] += 1
return url
return self._pick_least_loaded(candidates)

async def _notify_workers_changed(self) -> None:
"""Wake every dispatcher suspended in :meth:`_use_url`.
Expand Down
8 changes: 8 additions & 0 deletions tests/fast/router/test_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,14 @@ def test_use_url_selects_min_load(self, router_factory):
assert selected == "http://w2:8000"
assert router.worker_request_counts["http://w2:8000"] == 3

def test_use_url_async_selects_min_load(self, router_factory):
router = router_factory()
router.worker_request_counts = {"http://w1:8000": 5, "http://w2:8000": 2, "http://w3:8000": 8}

selected = asyncio.run(router._use_url_async())
assert selected == "http://w2:8000"
assert router.worker_request_counts["http://w2:8000"] == 3

def test_use_url_excludes_dead_workers(self, router_factory):
router = router_factory()
router.worker_request_counts = {"http://w1:8000": 5, "http://w2:8000": 1, "http://w3:8000": 3}
Expand Down