Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion bases/bot_detector/hiscore_scraper/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,12 @@ async def get_player_to_scrape(
if result is None:
logger.warning(f"[{worker_id}]: No player available.")
return None
await player_ts_queue.commit()
commit_error = await player_ts_queue.commit()
if commit_error:
logger.error(
f"[{worker_id}]: Error committing player to scrape offset: {commit_error}"
)
return None
return result


Expand Down
20 changes: 17 additions & 3 deletions bases/bot_detector/runemetrics_scraper/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,11 @@ async def work(
f"[{worker_id}]: Failed to requeue player: {produce_error}"
)
else:
await player_nf_queue.commit()
commit_error = await player_nf_queue.commit()
if commit_error:
logger.error(
f"[{worker_id}]: Failed to commit requeued player offset: {commit_error}"
)
await asyncio.sleep(10)
continue

Expand All @@ -235,7 +239,11 @@ async def work(
f"[{worker_id}]: Failed to requeue player: {produce_error}"
)
else:
await player_nf_queue.commit()
commit_error = await player_nf_queue.commit()
if commit_error:
logger.error(
f"[{worker_id}]: Failed to commit requeued player offset: {commit_error}"
)
Comment on lines +242 to +246
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add backoff when commit fails in this branch.

At Line 242–Line 246, commit failures only log and then immediately re-enter the loop, which can cause tight retry churn on the same record.

💡 Suggested fix
                 else:
                     commit_error = await player_nf_queue.commit()
                     if commit_error:
                         logger.error(
                             f"[{worker_id}]: Failed to commit requeued player offset: {commit_error}"
                         )
+                        await asyncio.sleep(10)
                 continue
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@bases/bot_detector/runemetrics_scraper/core.py` around lines 242 - 246, The
commit error branch for player_nf_queue.commit() currently only logs and
immediately continues, which can cause tight retry loops; modify the branch
handling the result of player_nf_queue.commit() (the block that checks
commit_error and logs with logger.error and worker_id) to perform an exponential
backoff (e.g., async sleep) before retrying, starting with a small delay and
capping the maximum delay, and reset the backoff on a successful commit;
implement this using asyncio.sleep (or the existing async sleep helper) in that
branch so the loop yields and backs off on repeated commit failures.

continue

# push data to kafka
Expand All @@ -246,7 +254,13 @@ async def work(
f"[{worker_id}]: Failed to produce scraped player: {produce_error}"
)
continue
await player_nf_queue.commit()
commit_error = await player_nf_queue.commit()
if commit_error:
logger.error(
f"[{worker_id}]: Failed to commit consumed player offset: {commit_error}"
)
await asyncio.sleep(10)
continue
logger.debug(
f"[{worker_id}][{player_data.name}]: {player_data.label_jagex=}"
)
Expand Down
12 changes: 10 additions & 2 deletions bases/bot_detector/worker_hiscore/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,11 @@ async def consume_many_task(
await asyncio.sleep(15)
continue

await player_sc_queue.commit()
commit_result = await player_sc_queue.commit()
if isinstance(commit_result, Exception):
logger.error(f"Failed to commit scraped batch: {commit_result}")
await asyncio.sleep(15)
continue

# ideally we want batches to be as full as possible, this is more efficient on the database
if len(batch) < 1000:
Expand All @@ -189,7 +193,11 @@ async def consume_many_task(
if isinstance(requeue_result, Exception):
logger.error(f"Failed to requeue scraped batch: {requeue_result}")
else:
await player_sc_queue.commit()
commit_result = await player_sc_queue.commit()
if isinstance(commit_result, Exception):
logger.error(
f"Failed to commit requeued scraped batch: {commit_result}"
)
await asyncio.sleep(15)


Expand Down
76 changes: 69 additions & 7 deletions bases/bot_detector/worker_ml/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,16 @@ async def consume_data_to_predict(
)
await asyncio.sleep(15)
continue
await data_to_predict_queue.commit()
commit_result = await data_to_predict_queue.commit()
if isinstance(commit_result, Exception):
logger.error(
{
"error": "failed to commit offset after requeue",
"reason": str(commit_result),
}
)
await asyncio.sleep(15)
continue
await asyncio.sleep(15)
continue

Expand Down Expand Up @@ -173,10 +182,27 @@ async def consume_data_to_predict(
)
await asyncio.sleep(15)
continue
await data_to_predict_queue.commit()
commit_result = await data_to_predict_queue.commit()
if isinstance(commit_result, Exception):
logger.error(
{
"error": "failed to commit offset after requeue",
"reason": str(commit_result),
}
)
await asyncio.sleep(15)
continue
await asyncio.sleep(15)
continue
await data_to_predict_queue.commit()
commit_result = await data_to_predict_queue.commit()
if isinstance(commit_result, Exception):
logger.error(
{
"error": "failed to commit processed batch",
"reason": str(commit_result),
}
)
await asyncio.sleep(15)


async def consume_player_scraped(
Expand Down Expand Up @@ -208,7 +234,16 @@ async def consume_player_scraped(

if not input_data:
logger.info("No valid highscore data to process. (input_data is empty)")
await player_sc_queue.commit()
commit_result = await player_sc_queue.commit()
if isinstance(commit_result, Exception):
logger.error(
{
"error": "failed to commit offset after requeue",
"reason": str(commit_result),
}
)
await asyncio.sleep(15)
continue
continue

try:
Expand Down Expand Up @@ -243,7 +278,16 @@ async def consume_player_scraped(
)
await asyncio.sleep(15)
continue
await player_sc_queue.commit()
commit_result = await player_sc_queue.commit()
if isinstance(commit_result, Exception):
logger.error(
{
"error": "failed to commit offset after requeue",
"reason": str(commit_result),
}
)
await asyncio.sleep(15)
continue
await asyncio.sleep(15)
continue

Expand All @@ -252,7 +296,16 @@ async def consume_player_scraped(
session_factory=session_factory,
predictions=combined_predictions,
)
await player_sc_queue.commit()
commit_result = await player_sc_queue.commit()
if isinstance(commit_result, Exception):
logger.error(
{
"error": "failed to commit processed batch",
"reason": str(commit_result),
}
)
await asyncio.sleep(15)
continue
except Exception as e:
logger.error(f"Error consuming scrapes: {e}")
logger.debug(f"Traceback: \n{traceback.format_exc()}")
Expand All @@ -267,7 +320,16 @@ async def consume_player_scraped(
)
await asyncio.sleep(15)
continue
await player_sc_queue.commit()
commit_result = await player_sc_queue.commit()
if isinstance(commit_result, Exception):
logger.error(
{
"error": "failed to commit offset after requeue",
"reason": str(commit_result),
}
)
await asyncio.sleep(15)
continue
await asyncio.sleep(15)


Expand Down
10 changes: 8 additions & 2 deletions bases/bot_detector/worker_report/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,10 @@ async def consume_many_task(
logger.debug(f"Traceback: \n{traceback.format_exc()}")
await asyncio.sleep(5)
if should_commit:
await report_queue.commit()
commit_result = await report_queue.commit()
if isinstance(commit_result, Exception):
logger.error(f"Failed to commit processed reports: {commit_result}")
await asyncio.sleep(5)


async def error_task(
Expand All @@ -140,7 +143,10 @@ async def error_task(
if not isinstance(report, ReportsToInsertStruct):
logger.warning(f"invalid {report=}")
continue
await report_queue.put(message=[report])
put_result = await report_queue.put(message=[report])
if isinstance(put_result, Exception):
logger.error(f"Failed to requeue report from error queue: {put_result}")
await asyncio.sleep(5)
Comment on lines +146 to +149
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Requeue failure here can drop reports from the in-memory error queue.

At Line 146, the item is already removed from error_queue; if Kafka put fails (Line 147), it is only logged/slept and effectively lost. Reinsert it for retry.

💡 Suggested fix
         put_result = await report_queue.put(message=[report])
         if isinstance(put_result, Exception):
             logger.error(f"Failed to requeue report from error queue: {put_result}")
+            await error_queue.put(report)
             await asyncio.sleep(5)
+            continue
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
put_result = await report_queue.put(message=[report])
if isinstance(put_result, Exception):
logger.error(f"Failed to requeue report from error queue: {put_result}")
await asyncio.sleep(5)
put_result = await report_queue.put(message=[report])
if isinstance(put_result, Exception):
logger.error(f"Failed to requeue report from error queue: {put_result}")
await error_queue.put(report)
await asyncio.sleep(5)
continue
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@bases/bot_detector/worker_report/main.py` around lines 146 - 149, The requeue
logic currently drops a report when report_queue.put fails because the item has
already been removed from error_queue; update the failure branch so that if
put_result is an Exception you reinsert the same report back into the in-memory
error queue (e.g., await error_queue.put(report)) before logging/sleeping;
locate the code around report_queue.put(...) and the variable names report,
error_queue, report_queue and logger in worker_report/main.py and add an await
error_queue.put(report) (or equivalent retry-safe enqueue) inside the
isinstance(put_result, Exception) branch.



async def main():
Expand Down
8 changes: 4 additions & 4 deletions components/bot_detector/event_queue/adapters/kafka/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ async def commit(self) -> Optional[Exception]:
return ConsumerNotStartedError(
"Consumer is None, did you start the consumer?"
)
await self.consumer.commit()
return await self.consumer.commit()


class AIOKafkaAdapter(QueueBackendProtocol[T]):
Expand All @@ -223,8 +223,8 @@ async def stop(self) -> None:
if self.producer:
await self.producer.stop()

async def put(self, messages: list[T]) -> None:
await self.producer.put(messages)
async def put(self, messages: list[T]) -> Optional[Exception]:
return await self.producer.put(messages)

async def get_one(self) -> Optional[T] | Exception:
return await self.consumer.get_one()
Expand All @@ -233,4 +233,4 @@ async def get_many(self, count: int) -> list[T] | Exception:
return await self.consumer.get_many(count)

async def commit(self) -> Optional[Exception]:
await self.consumer.commit()
return await self.consumer.commit()
6 changes: 3 additions & 3 deletions components/bot_detector/event_queue/core/event_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ async def start(self):
async def stop(self):
await self._backend.stop()

async def put(self, message: list[T]):
await self._backend.put(message)
async def put(self, message: list[T]) -> Optional[Exception]:
return await self._backend.put(message)


class QueueConsumer(Generic[T]):
Expand Down Expand Up @@ -64,7 +64,7 @@ async def get_one(self) -> Optional[T] | Exception:
async def get_many(self, count: int) -> list[T] | Exception:
return await self._backend.get_many(count)

async def commit(self):
async def commit(self) -> Optional[Exception]:
return await self._backend.commit()


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,8 +426,29 @@ async def test_consumer_commit_success():
)
adapter = AIOKafkaConsumerAdapter(PlayerScraped, config)
adapter.consumer = AsyncMock()
adapter.consumer.commit = AsyncMock()
adapter.consumer.commit = AsyncMock(return_value=None)

await adapter.commit()
result = await adapter.commit()

adapter.consumer.commit.assert_awaited_once()
assert result is None


@pytest.mark.asyncio
async def test_consumer_commit_propagates_error_value():
config = KafkaConfig(
topic="players",
bootstrap_servers="localhost:9092",
consumer=True,
producer=False,
consumer_config=KafkaConsumerConfig(group_id="group"),
producer_config=None,
)
adapter = AIOKafkaConsumerAdapter(PlayerScraped, config)
adapter.consumer = AsyncMock()
commit_error = Exception("commit failed")
adapter.consumer.commit = AsyncMock(return_value=commit_error)

result = await adapter.commit()

assert result is commit_error
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,16 @@ async def test_queue_adapter_wiring():
adapter.consumer.start = AsyncMock()
adapter.producer.stop = AsyncMock()
adapter.consumer.stop = AsyncMock()
adapter.producer.put = AsyncMock()
adapter.producer.put = AsyncMock(return_value=None)
adapter.consumer.get_one = AsyncMock(return_value="one")
adapter.consumer.get_many = AsyncMock(return_value=["many"])
adapter.consumer.commit = AsyncMock()
adapter.consumer.commit = AsyncMock(return_value=None)

await adapter.start()
await adapter.put([PlayerScraped(id=1, username="Alice", score=100)])
put_result = await adapter.put([PlayerScraped(id=1, username="Alice", score=100)])
one = await adapter.get_one()
many = await adapter.get_many(2)
await adapter.commit()
commit_result = await adapter.commit()
await adapter.stop()

adapter.producer.start.assert_awaited_once()
Expand All @@ -53,3 +53,45 @@ async def test_queue_adapter_wiring():
adapter.producer.stop.assert_awaited_once()
assert one == "one"
assert many == ["many"]
assert put_result is None
assert commit_result is None


@pytest.mark.asyncio
async def test_queue_adapter_propagates_put_error() -> None:
config = KafkaConfig(
topic="players",
bootstrap_servers="localhost:9092",
consumer=True,
producer=True,
consumer_config=KafkaConsumerConfig(group_id="group"),
producer_config=KafkaProducerConfig(partition_key_fn=lambda _: "1"),
)
adapter = AIOKafkaAdapter(PlayerScraped, config)

producer_error = Exception("produce failed")
adapter.producer.put = AsyncMock(return_value=producer_error)

result = await adapter.put([PlayerScraped(id=1, username="Alice", score=100)])

assert result is producer_error


@pytest.mark.asyncio
async def test_queue_adapter_propagates_commit_error() -> None:
config = KafkaConfig(
topic="players",
bootstrap_servers="localhost:9092",
consumer=True,
producer=True,
consumer_config=KafkaConsumerConfig(group_id="group"),
producer_config=KafkaProducerConfig(partition_key_fn=lambda _: "1"),
)
adapter = AIOKafkaAdapter(PlayerScraped, config)

commit_error = Exception("commit failed")
adapter.consumer.commit = AsyncMock(return_value=commit_error)

result = await adapter.commit()

assert result is commit_error