diff --git a/bases/bot_detector/hiscore_scraper/core.py b/bases/bot_detector/hiscore_scraper/core.py index 6056498..9b58766 100644 --- a/bases/bot_detector/hiscore_scraper/core.py +++ b/bases/bot_detector/hiscore_scraper/core.py @@ -263,7 +263,12 @@ async def get_player_to_scrape( if result is None: logger.warning(f"[{worker_id}]: No player available.") return None - await player_ts_queue.commit() + commit_error = await player_ts_queue.commit() + if commit_error: + logger.error( + f"[{worker_id}]: Error committing player to scrape offset: {commit_error}" + ) + return None return result diff --git a/bases/bot_detector/runemetrics_scraper/core.py b/bases/bot_detector/runemetrics_scraper/core.py index 360a535..00af842 100644 --- a/bases/bot_detector/runemetrics_scraper/core.py +++ b/bases/bot_detector/runemetrics_scraper/core.py @@ -210,7 +210,11 @@ async def work( f"[{worker_id}]: Failed to requeue player: {produce_error}" ) else: - await player_nf_queue.commit() + commit_error = await player_nf_queue.commit() + if commit_error: + logger.error( + f"[{worker_id}]: Failed to commit requeued player offset: {commit_error}" + ) await asyncio.sleep(10) continue @@ -235,7 +239,11 @@ async def work( f"[{worker_id}]: Failed to requeue player: {produce_error}" ) else: - await player_nf_queue.commit() + commit_error = await player_nf_queue.commit() + if commit_error: + logger.error( + f"[{worker_id}]: Failed to commit requeued player offset: {commit_error}" + ) continue # push data to kafka @@ -246,7 +254,13 @@ async def work( f"[{worker_id}]: Failed to produce scraped player: {produce_error}" ) continue - await player_nf_queue.commit() + commit_error = await player_nf_queue.commit() + if commit_error: + logger.error( + f"[{worker_id}]: Failed to commit consumed player offset: {commit_error}" + ) + await asyncio.sleep(10) + continue logger.debug( f"[{worker_id}][{player_data.name}]: {player_data.label_jagex=}" ) diff --git a/bases/bot_detector/worker_hiscore/core.py b/bases/bot_detector/worker_hiscore/core.py index 53ed45d..c52d3f9 100644 --- a/bases/bot_detector/worker_hiscore/core.py +++ b/bases/bot_detector/worker_hiscore/core.py @@ -176,7 +176,11 @@ async def consume_many_task( await asyncio.sleep(15) continue - await player_sc_queue.commit() + commit_result = await player_sc_queue.commit() + if isinstance(commit_result, Exception): + logger.error(f"Failed to commit scraped batch: {commit_result}") + await asyncio.sleep(15) + continue # ideally we want batches to be as full as possible, this is more efficient on the database if len(batch) < 1000: @@ -189,7 +193,11 @@ async def consume_many_task( if isinstance(requeue_result, Exception): logger.error(f"Failed to requeue scraped batch: {requeue_result}") else: - await player_sc_queue.commit() + commit_result = await player_sc_queue.commit() + if isinstance(commit_result, Exception): + logger.error( + f"Failed to commit requeued scraped batch: {commit_result}" + ) await asyncio.sleep(15) diff --git a/bases/bot_detector/worker_ml/core.py b/bases/bot_detector/worker_ml/core.py index 204a548..bd91be9 100644 --- a/bases/bot_detector/worker_ml/core.py +++ b/bases/bot_detector/worker_ml/core.py @@ -137,7 +137,16 @@ async def consume_data_to_predict( ) await asyncio.sleep(15) continue - await data_to_predict_queue.commit() + commit_result = await data_to_predict_queue.commit() + if isinstance(commit_result, Exception): + logger.error( + { + "error": "failed to commit offset after requeue", + "reason": str(commit_result), + } + ) + await asyncio.sleep(15) + continue await asyncio.sleep(15) continue @@ -173,10 +182,27 @@ async def consume_data_to_predict( ) await asyncio.sleep(15) continue - await data_to_predict_queue.commit() + commit_result = await data_to_predict_queue.commit() + if isinstance(commit_result, Exception): + logger.error( + { + "error": "failed to commit offset after requeue", + "reason": str(commit_result), + } + ) + await asyncio.sleep(15) + continue await asyncio.sleep(15) continue - await data_to_predict_queue.commit() + commit_result = await data_to_predict_queue.commit() + if isinstance(commit_result, Exception): + logger.error( + { + "error": "failed to commit processed batch", + "reason": str(commit_result), + } + ) + await asyncio.sleep(15) async def consume_player_scraped( @@ -208,7 +234,16 @@ async def consume_player_scraped( if not input_data: logger.info("No valid highscore data to process. (input_data is empty)") - await player_sc_queue.commit() + commit_result = await player_sc_queue.commit() + if isinstance(commit_result, Exception): + logger.error( + { + "error": "failed to commit offset after requeue", + "reason": str(commit_result), + } + ) + await asyncio.sleep(15) + continue continue try: @@ -243,7 +278,16 @@ async def consume_player_scraped( ) await asyncio.sleep(15) continue - await player_sc_queue.commit() + commit_result = await player_sc_queue.commit() + if isinstance(commit_result, Exception): + logger.error( + { + "error": "failed to commit offset after requeue", + "reason": str(commit_result), + } + ) + await asyncio.sleep(15) + continue await asyncio.sleep(15) continue @@ -252,7 +296,16 @@ async def consume_player_scraped( session_factory=session_factory, predictions=combined_predictions, ) - await player_sc_queue.commit() + commit_result = await player_sc_queue.commit() + if isinstance(commit_result, Exception): + logger.error( + { + "error": "failed to commit processed batch", + "reason": str(commit_result), + } + ) + await asyncio.sleep(15) + continue except Exception as e: logger.error(f"Error consuming scrapes: {e}") logger.debug(f"Traceback: \n{traceback.format_exc()}") @@ -267,7 +320,16 @@ async def consume_player_scraped( ) await asyncio.sleep(15) continue - await player_sc_queue.commit() + commit_result = await player_sc_queue.commit() + if isinstance(commit_result, Exception): + logger.error( + { + "error": "failed to commit offset after requeue", + "reason": str(commit_result), + } + ) + await asyncio.sleep(15) + continue await asyncio.sleep(15) diff --git a/bases/bot_detector/worker_report/main.py b/bases/bot_detector/worker_report/main.py index c9e7dd2..5ece8b1 100644 --- a/bases/bot_detector/worker_report/main.py +++ b/bases/bot_detector/worker_report/main.py @@ -128,7 +128,10 @@ async def consume_many_task( logger.debug(f"Traceback: \n{traceback.format_exc()}") await asyncio.sleep(5) if should_commit: - await report_queue.commit() + commit_result = await report_queue.commit() + if isinstance(commit_result, Exception): + logger.error(f"Failed to commit processed reports: {commit_result}") + await asyncio.sleep(5) async def error_task( @@ -140,7 +143,10 @@ async def error_task( if not isinstance(report, ReportsToInsertStruct): logger.warning(f"invalid {report=}") continue - await report_queue.put(message=[report]) + put_result = await report_queue.put(message=[report]) + if isinstance(put_result, Exception): + logger.error(f"Failed to requeue report from error queue: {put_result}") + await asyncio.sleep(5) async def main(): diff --git a/components/bot_detector/event_queue/adapters/kafka/adapter.py b/components/bot_detector/event_queue/adapters/kafka/adapter.py index 94d8d42..2028137 100644 --- a/components/bot_detector/event_queue/adapters/kafka/adapter.py +++ b/components/bot_detector/event_queue/adapters/kafka/adapter.py @@ -197,7 +197,7 @@ async def commit(self) -> Optional[Exception]: return ConsumerNotStartedError( "Consumer is None, did you start the consumer?" ) - await self.consumer.commit() + return await self.consumer.commit() class AIOKafkaAdapter(QueueBackendProtocol[T]): @@ -223,8 +223,8 @@ async def stop(self) -> None: if self.producer: await self.producer.stop() - async def put(self, messages: list[T]) -> None: - await self.producer.put(messages) + async def put(self, messages: list[T]) -> Optional[Exception]: + return await self.producer.put(messages) async def get_one(self) -> Optional[T] | Exception: return await self.consumer.get_one() @@ -233,4 +233,4 @@ async def get_many(self, count: int) -> list[T] | Exception: return await self.consumer.get_many(count) async def commit(self) -> Optional[Exception]: - await self.consumer.commit() + return await self.consumer.commit() diff --git a/components/bot_detector/event_queue/core/event_queue.py b/components/bot_detector/event_queue/core/event_queue.py index 4849765..dbfdb51 100644 --- a/components/bot_detector/event_queue/core/event_queue.py +++ b/components/bot_detector/event_queue/core/event_queue.py @@ -32,8 +32,8 @@ async def start(self): async def stop(self): await self._backend.stop() - async def put(self, message: list[T]): - await self._backend.put(message) + async def put(self, message: list[T]) -> Optional[Exception]: + return await self._backend.put(message) class QueueConsumer(Generic[T]): @@ -64,7 +64,7 @@ async def get_one(self) -> Optional[T] | Exception: async def get_many(self, count: int) -> list[T] | Exception: return await self._backend.get_many(count) - async def commit(self): + async def commit(self) -> Optional[Exception]: return await self._backend.commit() diff --git a/test/components/bot_detector/event_queue/adapter_kafka/test_consumer_adapter_kafka.py b/test/components/bot_detector/event_queue/adapter_kafka/test_consumer_adapter_kafka.py index 5f73258..b459450 100644 --- a/test/components/bot_detector/event_queue/adapter_kafka/test_consumer_adapter_kafka.py +++ b/test/components/bot_detector/event_queue/adapter_kafka/test_consumer_adapter_kafka.py @@ -426,8 +426,29 @@ async def test_consumer_commit_success(): ) adapter = AIOKafkaConsumerAdapter(PlayerScraped, config) adapter.consumer = AsyncMock() - adapter.consumer.commit = AsyncMock() + adapter.consumer.commit = AsyncMock(return_value=None) - await adapter.commit() + result = await adapter.commit() adapter.consumer.commit.assert_awaited_once() + assert result is None + + +@pytest.mark.asyncio +async def test_consumer_commit_propagates_error_value(): + config = KafkaConfig( + topic="players", + bootstrap_servers="localhost:9092", + consumer=True, + producer=False, + consumer_config=KafkaConsumerConfig(group_id="group"), + producer_config=None, + ) + adapter = AIOKafkaConsumerAdapter(PlayerScraped, config) + adapter.consumer = AsyncMock() + commit_error = Exception("commit failed") + adapter.consumer.commit = AsyncMock(return_value=commit_error) + + result = await adapter.commit() + + assert result is commit_error diff --git a/test/components/bot_detector/event_queue/adapter_kafka/test_queue_adapter_kafka.py b/test/components/bot_detector/event_queue/adapter_kafka/test_queue_adapter_kafka.py index 0902faa..8551316 100644 --- a/test/components/bot_detector/event_queue/adapter_kafka/test_queue_adapter_kafka.py +++ b/test/components/bot_detector/event_queue/adapter_kafka/test_queue_adapter_kafka.py @@ -31,16 +31,16 @@ async def test_queue_adapter_wiring(): adapter.consumer.start = AsyncMock() adapter.producer.stop = AsyncMock() adapter.consumer.stop = AsyncMock() - adapter.producer.put = AsyncMock() + adapter.producer.put = AsyncMock(return_value=None) adapter.consumer.get_one = AsyncMock(return_value="one") adapter.consumer.get_many = AsyncMock(return_value=["many"]) - adapter.consumer.commit = AsyncMock() + adapter.consumer.commit = AsyncMock(return_value=None) await adapter.start() - await adapter.put([PlayerScraped(id=1, username="Alice", score=100)]) + put_result = await adapter.put([PlayerScraped(id=1, username="Alice", score=100)]) one = await adapter.get_one() many = await adapter.get_many(2) - await adapter.commit() + commit_result = await adapter.commit() await adapter.stop() adapter.producer.start.assert_awaited_once() @@ -53,3 +53,45 @@ async def test_queue_adapter_wiring(): adapter.producer.stop.assert_awaited_once() assert one == "one" assert many == ["many"] + assert put_result is None + assert commit_result is None + + +@pytest.mark.asyncio +async def test_queue_adapter_propagates_put_error() -> None: + config = KafkaConfig( + topic="players", + bootstrap_servers="localhost:9092", + consumer=True, + producer=True, + consumer_config=KafkaConsumerConfig(group_id="group"), + producer_config=KafkaProducerConfig(partition_key_fn=lambda _: "1"), + ) + adapter = AIOKafkaAdapter(PlayerScraped, config) + + producer_error = Exception("produce failed") + adapter.producer.put = AsyncMock(return_value=producer_error) + + result = await adapter.put([PlayerScraped(id=1, username="Alice", score=100)]) + + assert result is producer_error + + +@pytest.mark.asyncio +async def test_queue_adapter_propagates_commit_error() -> None: + config = KafkaConfig( + topic="players", + bootstrap_servers="localhost:9092", + consumer=True, + producer=True, + consumer_config=KafkaConsumerConfig(group_id="group"), + producer_config=KafkaProducerConfig(partition_key_fn=lambda _: "1"), + ) + adapter = AIOKafkaAdapter(PlayerScraped, config) + + commit_error = Exception("commit failed") + adapter.consumer.commit = AsyncMock(return_value=commit_error) + + result = await adapter.commit() + + assert result is commit_error