Propagate Kafka queue put/commit errors to callers#94
Conversation
📝 WalkthroughWalkthroughThe pull request systematically enhances error handling across queue operations by modifying adapter methods to return Changes
Sequence Diagram(s)sequenceDiagram
participant Worker as Worker
participant Queue as Queue Interface
participant Adapter as Kafka Adapter
participant Kafka as Kafka Broker
rect rgba(100, 150, 200, 0.5)
Note over Worker,Kafka: Error Handling Flow - Before
Worker->>Queue: commit()
Queue->>Adapter: commit()
Adapter->>Kafka: commit()
Kafka-->>Adapter: OK
Adapter-->>Queue: (no return)
Queue-->>Worker: (no return)
Worker->>Worker: Continue execution
end
rect rgba(150, 200, 100, 0.5)
Note over Worker,Kafka: Error Handling Flow - After
Worker->>Queue: commit()
Queue->>Adapter: commit()
Adapter->>Kafka: commit()
alt Success
Kafka-->>Adapter: OK
Adapter-->>Queue: None
Queue-->>Worker: None
Worker->>Worker: Continue execution
else Error Occurs
Kafka-->>Adapter: Exception
Adapter-->>Queue: Exception
Queue-->>Worker: Exception
Worker->>Worker: Log error
Worker->>Worker: Sleep
Worker->>Worker: Retry loop
end
end
Estimated Code Review Effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly Related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Warning Review ran into problems🔥 ProblemsGit: Failed to clone repository. Please run the Tip Try Coding Plans. Let us write the prompt for your AI agent so you can ship faster (with fewer bugs). Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
components/bot_detector/event_queue/adapters/kafka/adapter.py (1)
195-200:⚠️ Potential issue | 🟠 Major
commit()must catch exceptions from aiokafka and return them as error values.
AIOKafkaConsumer.commit()in aiokafka 0.12.0 raises exceptions (CommitFailedError, IllegalOperation, IllegalStateError, KafkaError) on failure instead of returning error values. Line 200 directly awaits without handling, causing exceptions to propagate and crash callers expectingOptional[Exception]return values.Suggested fix
async def commit(self) -> Optional[Exception]: if self.consumer is None: return ConsumerNotStartedError( "Consumer is None, did you start the consumer?" ) - return await self.consumer.commit() + try: + await self.consumer.commit() + return None + except Exception as e: + return eAlso applies to: line 236
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@components/bot_detector/event_queue/adapters/kafka/adapter.py` around lines 195 - 200, The commit() method currently awaits self.consumer.commit() directly which lets aiokafka exceptions (CommitFailedError, IllegalOperation, IllegalStateError, KafkaError) propagate; change commit() in the adapter to wrap the await in a try/except that catches Exception (or the specific aiokafka exceptions) and returns the caught exception instead of raising it, keeping the existing ConsumerNotStartedError return when consumer is None; apply the same try/except-and-return-exception pattern to the other adapter method that awaits self.consumer.commit() near line 236 so both places return Optional[Exception] on failure rather than throwing.
🧹 Nitpick comments (1)
test/components/bot_detector/event_queue/adapter_kafka/test_queue_adapter_kafka.py (1)
62-69: Consider extracting duplicatedKafkaConfigsetup into a fixture/helper.The same config block appears multiple times, which makes tests noisier and harder to update consistently.
♻️ Suggested refactor
+def _build_kafka_config() -> KafkaConfig: + return KafkaConfig( + topic="players", + bootstrap_servers="localhost:9092", + consumer=True, + producer=True, + consumer_config=KafkaConsumerConfig(group_id="group"), + producer_config=KafkaProducerConfig(partition_key_fn=lambda _: "1"), + ) + `@pytest.mark.asyncio` async def test_queue_adapter_propagates_put_error() -> None: - config = KafkaConfig( - topic="players", - bootstrap_servers="localhost:9092", - consumer=True, - producer=True, - consumer_config=KafkaConsumerConfig(group_id="group"), - producer_config=KafkaProducerConfig(partition_key_fn=lambda _: "1"), - ) + config = _build_kafka_config() adapter = AIOKafkaAdapter(PlayerScraped, config)Also applies to: 82-89
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@test/components/bot_detector/event_queue/adapter_kafka/test_queue_adapter_kafka.py` around lines 62 - 69, The KafkaConfig construction is duplicated across tests (e.g., the KafkaConfig(...) block used in test_queue_adapter_kafka.py around the config variable); extract that repeated setup into a single reusable fixture or helper function (e.g., a pytest fixture named kafka_config or a helper build_kafka_config()) and replace the inline KafkaConfig instantiations with references to that fixture/helper; ensure the fixture returns a KafkaConfig instance with the same fields (topic, bootstrap_servers, consumer/producer flags, consumer_config, producer_config) so all tests (including the other occurrence at lines 82-89) use the centralized config and remain consistent.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@bases/bot_detector/runemetrics_scraper/core.py`:
- Around line 242-246: The commit error branch for player_nf_queue.commit()
currently only logs and immediately continues, which can cause tight retry
loops; modify the branch handling the result of player_nf_queue.commit() (the
block that checks commit_error and logs with logger.error and worker_id) to
perform an exponential backoff (e.g., async sleep) before retrying, starting
with a small delay and capping the maximum delay, and reset the backoff on a
successful commit; implement this using asyncio.sleep (or the existing async
sleep helper) in that branch so the loop yields and backs off on repeated commit
failures.
In `@bases/bot_detector/worker_report/main.py`:
- Around line 146-149: The requeue logic currently drops a report when
report_queue.put fails because the item has already been removed from
error_queue; update the failure branch so that if put_result is an Exception you
reinsert the same report back into the in-memory error queue (e.g., await
error_queue.put(report)) before logging/sleeping; locate the code around
report_queue.put(...) and the variable names report, error_queue, report_queue
and logger in worker_report/main.py and add an await error_queue.put(report) (or
equivalent retry-safe enqueue) inside the isinstance(put_result, Exception)
branch.
---
Outside diff comments:
In `@components/bot_detector/event_queue/adapters/kafka/adapter.py`:
- Around line 195-200: The commit() method currently awaits
self.consumer.commit() directly which lets aiokafka exceptions
(CommitFailedError, IllegalOperation, IllegalStateError, KafkaError) propagate;
change commit() in the adapter to wrap the await in a try/except that catches
Exception (or the specific aiokafka exceptions) and returns the caught exception
instead of raising it, keeping the existing ConsumerNotStartedError return when
consumer is None; apply the same try/except-and-return-exception pattern to the
other adapter method that awaits self.consumer.commit() near line 236 so both
places return Optional[Exception] on failure rather than throwing.
---
Nitpick comments:
In
`@test/components/bot_detector/event_queue/adapter_kafka/test_queue_adapter_kafka.py`:
- Around line 62-69: The KafkaConfig construction is duplicated across tests
(e.g., the KafkaConfig(...) block used in test_queue_adapter_kafka.py around the
config variable); extract that repeated setup into a single reusable fixture or
helper function (e.g., a pytest fixture named kafka_config or a helper
build_kafka_config()) and replace the inline KafkaConfig instantiations with
references to that fixture/helper; ensure the fixture returns a KafkaConfig
instance with the same fields (topic, bootstrap_servers, consumer/producer
flags, consumer_config, producer_config) so all tests (including the other
occurrence at lines 82-89) use the centralized config and remain consistent.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (9)
bases/bot_detector/hiscore_scraper/core.pybases/bot_detector/runemetrics_scraper/core.pybases/bot_detector/worker_hiscore/core.pybases/bot_detector/worker_ml/core.pybases/bot_detector/worker_report/main.pycomponents/bot_detector/event_queue/adapters/kafka/adapter.pycomponents/bot_detector/event_queue/core/event_queue.pytest/components/bot_detector/event_queue/adapter_kafka/test_consumer_adapter_kafka.pytest/components/bot_detector/event_queue/adapter_kafka/test_queue_adapter_kafka.py
| commit_error = await player_nf_queue.commit() | ||
| if commit_error: | ||
| logger.error( | ||
| f"[{worker_id}]: Failed to commit requeued player offset: {commit_error}" | ||
| ) |
There was a problem hiding this comment.
Add backoff when commit fails in this branch.
At Line 242–Line 246, commit failures only log and then immediately re-enter the loop, which can cause tight retry churn on the same record.
💡 Suggested fix
else:
commit_error = await player_nf_queue.commit()
if commit_error:
logger.error(
f"[{worker_id}]: Failed to commit requeued player offset: {commit_error}"
)
+ await asyncio.sleep(10)
continue🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@bases/bot_detector/runemetrics_scraper/core.py` around lines 242 - 246, The
commit error branch for player_nf_queue.commit() currently only logs and
immediately continues, which can cause tight retry loops; modify the branch
handling the result of player_nf_queue.commit() (the block that checks
commit_error and logs with logger.error and worker_id) to perform an exponential
backoff (e.g., async sleep) before retrying, starting with a small delay and
capping the maximum delay, and reset the backoff on a successful commit;
implement this using asyncio.sleep (or the existing async sleep helper) in that
branch so the loop yields and backs off on repeated commit failures.
| put_result = await report_queue.put(message=[report]) | ||
| if isinstance(put_result, Exception): | ||
| logger.error(f"Failed to requeue report from error queue: {put_result}") | ||
| await asyncio.sleep(5) |
There was a problem hiding this comment.
Requeue failure here can drop reports from the in-memory error queue.
At Line 146, the item is already removed from error_queue; if Kafka put fails (Line 147), it is only logged/slept and effectively lost. Reinsert it for retry.
💡 Suggested fix
put_result = await report_queue.put(message=[report])
if isinstance(put_result, Exception):
logger.error(f"Failed to requeue report from error queue: {put_result}")
+ await error_queue.put(report)
await asyncio.sleep(5)
+ continue📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| put_result = await report_queue.put(message=[report]) | |
| if isinstance(put_result, Exception): | |
| logger.error(f"Failed to requeue report from error queue: {put_result}") | |
| await asyncio.sleep(5) | |
| put_result = await report_queue.put(message=[report]) | |
| if isinstance(put_result, Exception): | |
| logger.error(f"Failed to requeue report from error queue: {put_result}") | |
| await error_queue.put(report) | |
| await asyncio.sleep(5) | |
| continue |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@bases/bot_detector/worker_report/main.py` around lines 146 - 149, The requeue
logic currently drops a report when report_queue.put fails because the item has
already been removed from error_queue; update the failure branch so that if
put_result is an Exception you reinsert the same report back into the in-memory
error queue (e.g., await error_queue.put(report)) before logging/sleeping;
locate the code around report_queue.put(...) and the variable names report,
error_queue, report_queue and logger in worker_report/main.py and add an await
error_queue.put(report) (or equivalent retry-safe enqueue) inside the
isinstance(put_result, Exception) branch.
Motivation
Description
AIOKafkaAdapter.putandAIOKafkaAdapter.committo returnOptional[Exception]and propagate the underlying adapter results fromself.producer.put(messages)andself.consumer.commit()respectively.AIOKafkaConsumerAdapter.committo return the underlying consumer commit result (anOptional[Exception]value) instead of always awaiting and discarding it.QueueProducer.putandQueueConsumer.commitsignatures to returnOptional[Exception]and forward backend return values.worker_ml,worker_hiscore,runemetrics_scraper,hiscore_scraper, andworker_reportso commit/put error values are handled consistently.test/components/bot_detector/event_queue/adapter_kafka/to verify thatput/commiterror values propagate through the adapter layers and to assert return values for the success cases.Testing
uv run ruff formatand verifieduv run ruff format --check .reports files formatted (final check passed).uv run pytest test/components/bot_detector/event_queue/adapter_kafka/test_queue_adapter_kafka.py test/components/bot_detector/event_queue/adapter_kafka/test_consumer_adapter_kafka.py, which completed successfully (all tests passed, with existing warnings only).Codex Task
Summary by CodeRabbit