Skip to content

Propagate Kafka queue put/commit errors to callers#94

Merged
extreme4all merged 1 commit intodevelopfrom
codex/update-aiokafkaadapter-methods-and-tests
Mar 3, 2026
Merged

Propagate Kafka queue put/commit errors to callers#94
extreme4all merged 1 commit intodevelopfrom
codex/update-aiokafkaadapter-methods-and-tests

Conversation

@extreme4all
Copy link
Contributor

@extreme4all extreme4all commented Mar 3, 2026

Motivation

  • Make queue semantics explicit by surfacing producer/consumer errors instead of swallowing them so caller code (worker loops and producers) can react (log, retry/backoff, or fail fast).

Description

  • Change AIOKafkaAdapter.put and AIOKafkaAdapter.commit to return Optional[Exception] and propagate the underlying adapter results from self.producer.put(messages) and self.consumer.commit() respectively.
  • Update AIOKafkaConsumerAdapter.commit to return the underlying consumer commit result (an Optional[Exception] value) instead of always awaiting and discarding it.
  • Update high-level queue wrappers to propagate backend results by changing QueueProducer.put and QueueConsumer.commit signatures to return Optional[Exception] and forward backend return values.
  • Add explicit error handling at Queue put/commit call sites in base worker loops (log + retry/backoff or skip/continue) in worker_ml, worker_hiscore, runemetrics_scraper, hiscore_scraper, and worker_report so commit/put error values are handled consistently.
  • Add/adjust unit tests under test/components/bot_detector/event_queue/adapter_kafka/ to verify that put/commit error values propagate through the adapter layers and to assert return values for the success cases.

Testing

  • Ran formatter check and fixed files with uv run ruff format and verified uv run ruff format --check . reports files formatted (final check passed).
  • Ran targeted unit tests with uv run pytest test/components/bot_detector/event_queue/adapter_kafka/test_queue_adapter_kafka.py test/components/bot_detector/event_queue/adapter_kafka/test_consumer_adapter_kafka.py, which completed successfully (all tests passed, with existing warnings only).
  • No integration or runtime Kafka cluster tests were run as part of this change; only the adapter unit tests were executed and passed.

Codex Task

Summary by CodeRabbit

  • Bug Fixes
    • Enhanced error handling in queue operations across all workers. Queue operations now explicitly detect and log failures with automatic retry logic and brief delays, improving system resilience when operations fail.
    • Consistent error detection and recovery mechanisms throughout the queue management infrastructure.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 3, 2026

📝 Walkthrough

Walkthrough

The pull request systematically enhances error handling across queue operations by modifying adapter methods to return Optional[Exception] and updating all consuming code to check commit/put results, logging errors and retrying when exceptions occur.

Changes

Cohort / File(s) Summary
Kafka Adapter Infrastructure
components/bot_detector/event_queue/adapters/kafka/adapter.py, components/bot_detector/event_queue/core/event_queue.py
Modified put() and commit() methods to return Optional[Exception] instead of void, propagating errors as return values for explicit caller handling.
Bot Detector Hiscore & Runemetrics Scrapers
bases/bot_detector/hiscore_scraper/core.py, bases/bot_detector/runemetrics_scraper/core.py
Added commit result checks; on error, log failure details and return None or sleep-and-retry depending on error path.
Bot Detector Worker Services
bases/bot_detector/worker_hiscore/core.py, bases/bot_detector/worker_ml/core.py, bases/bot_detector/worker_report/main.py
Enhanced commit/put result handling across multiple task paths; capture exceptions, log errors with context, sleep, and retry on failure.
Event Queue Tests
test/components/bot_detector/event_queue/adapter_kafka/test_consumer_adapter_kafka.py, test/components/bot_detector/event_queue/adapter_kafka/test_queue_adapter_kafka.py
Updated existing tests to verify success paths return None; added new tests to verify exception propagation from commit/put operations.

Sequence Diagram(s)

sequenceDiagram
    participant Worker as Worker
    participant Queue as Queue Interface
    participant Adapter as Kafka Adapter
    participant Kafka as Kafka Broker

    rect rgba(100, 150, 200, 0.5)
        Note over Worker,Kafka: Error Handling Flow - Before
        Worker->>Queue: commit()
        Queue->>Adapter: commit()
        Adapter->>Kafka: commit()
        Kafka-->>Adapter: OK
        Adapter-->>Queue: (no return)
        Queue-->>Worker: (no return)
        Worker->>Worker: Continue execution
    end

    rect rgba(150, 200, 100, 0.5)
        Note over Worker,Kafka: Error Handling Flow - After
        Worker->>Queue: commit()
        Queue->>Adapter: commit()
        Adapter->>Kafka: commit()
        alt Success
            Kafka-->>Adapter: OK
            Adapter-->>Queue: None
            Queue-->>Worker: None
            Worker->>Worker: Continue execution
        else Error Occurs
            Kafka-->>Adapter: Exception
            Adapter-->>Queue: Exception
            Queue-->>Worker: Exception
            Worker->>Worker: Log error
            Worker->>Worker: Sleep
            Worker->>Worker: Retry loop
        end
    end
Loading

Estimated Code Review Effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly Related PRs

  • chore/kafka-migration #72 (chore/kafka-migration): Introduced the event_queue abstractions that establish put() and commit() methods returning Optional[Exception], making this PR's error-handling pattern necessary for proper caller integration.

Poem

🐰 Errors once silent, now speak with care,
Each commit checked, handled with flair!
Return values flow like carrots so bright,
Retry on failure—we'll get it right! 🥕
From queue to worker, errors take flight.

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'Propagate Kafka queue put/commit errors to callers' accurately and specifically summarizes the main change: modifying queue adapter methods to return Optional[Exception] and updating callers to handle these errors.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch codex/update-aiokafkaadapter-methods-and-tests

Warning

Review ran into problems

🔥 Problems

Git: Failed to clone repository. Please run the @coderabbitai full review command to re-trigger a full review. If the issue persists, set path_filters to include or exclude specific files.

Tip

Try Coding Plans. Let us write the prompt for your AI agent so you can ship faster (with fewer bugs).
Share your feedback on Discord.


Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
components/bot_detector/event_queue/adapters/kafka/adapter.py (1)

195-200: ⚠️ Potential issue | 🟠 Major

commit() must catch exceptions from aiokafka and return them as error values.

AIOKafkaConsumer.commit() in aiokafka 0.12.0 raises exceptions (CommitFailedError, IllegalOperation, IllegalStateError, KafkaError) on failure instead of returning error values. Line 200 directly awaits without handling, causing exceptions to propagate and crash callers expecting Optional[Exception] return values.

Suggested fix
     async def commit(self) -> Optional[Exception]:
         if self.consumer is None:
             return ConsumerNotStartedError(
                 "Consumer is None, did you start the consumer?"
             )
-        return await self.consumer.commit()
+        try:
+            await self.consumer.commit()
+            return None
+        except Exception as e:
+            return e

Also applies to: line 236

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/bot_detector/event_queue/adapters/kafka/adapter.py` around lines
195 - 200, The commit() method currently awaits self.consumer.commit() directly
which lets aiokafka exceptions (CommitFailedError, IllegalOperation,
IllegalStateError, KafkaError) propagate; change commit() in the adapter to wrap
the await in a try/except that catches Exception (or the specific aiokafka
exceptions) and returns the caught exception instead of raising it, keeping the
existing ConsumerNotStartedError return when consumer is None; apply the same
try/except-and-return-exception pattern to the other adapter method that awaits
self.consumer.commit() near line 236 so both places return Optional[Exception]
on failure rather than throwing.
🧹 Nitpick comments (1)
test/components/bot_detector/event_queue/adapter_kafka/test_queue_adapter_kafka.py (1)

62-69: Consider extracting duplicated KafkaConfig setup into a fixture/helper.

The same config block appears multiple times, which makes tests noisier and harder to update consistently.

♻️ Suggested refactor
+def _build_kafka_config() -> KafkaConfig:
+    return KafkaConfig(
+        topic="players",
+        bootstrap_servers="localhost:9092",
+        consumer=True,
+        producer=True,
+        consumer_config=KafkaConsumerConfig(group_id="group"),
+        producer_config=KafkaProducerConfig(partition_key_fn=lambda _: "1"),
+    )
+
 `@pytest.mark.asyncio`
 async def test_queue_adapter_propagates_put_error() -> None:
-    config = KafkaConfig(
-        topic="players",
-        bootstrap_servers="localhost:9092",
-        consumer=True,
-        producer=True,
-        consumer_config=KafkaConsumerConfig(group_id="group"),
-        producer_config=KafkaProducerConfig(partition_key_fn=lambda _: "1"),
-    )
+    config = _build_kafka_config()
     adapter = AIOKafkaAdapter(PlayerScraped, config)

Also applies to: 82-89

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@test/components/bot_detector/event_queue/adapter_kafka/test_queue_adapter_kafka.py`
around lines 62 - 69, The KafkaConfig construction is duplicated across tests
(e.g., the KafkaConfig(...) block used in test_queue_adapter_kafka.py around the
config variable); extract that repeated setup into a single reusable fixture or
helper function (e.g., a pytest fixture named kafka_config or a helper
build_kafka_config()) and replace the inline KafkaConfig instantiations with
references to that fixture/helper; ensure the fixture returns a KafkaConfig
instance with the same fields (topic, bootstrap_servers, consumer/producer
flags, consumer_config, producer_config) so all tests (including the other
occurrence at lines 82-89) use the centralized config and remain consistent.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@bases/bot_detector/runemetrics_scraper/core.py`:
- Around line 242-246: The commit error branch for player_nf_queue.commit()
currently only logs and immediately continues, which can cause tight retry
loops; modify the branch handling the result of player_nf_queue.commit() (the
block that checks commit_error and logs with logger.error and worker_id) to
perform an exponential backoff (e.g., async sleep) before retrying, starting
with a small delay and capping the maximum delay, and reset the backoff on a
successful commit; implement this using asyncio.sleep (or the existing async
sleep helper) in that branch so the loop yields and backs off on repeated commit
failures.

In `@bases/bot_detector/worker_report/main.py`:
- Around line 146-149: The requeue logic currently drops a report when
report_queue.put fails because the item has already been removed from
error_queue; update the failure branch so that if put_result is an Exception you
reinsert the same report back into the in-memory error queue (e.g., await
error_queue.put(report)) before logging/sleeping; locate the code around
report_queue.put(...) and the variable names report, error_queue, report_queue
and logger in worker_report/main.py and add an await error_queue.put(report) (or
equivalent retry-safe enqueue) inside the isinstance(put_result, Exception)
branch.

---

Outside diff comments:
In `@components/bot_detector/event_queue/adapters/kafka/adapter.py`:
- Around line 195-200: The commit() method currently awaits
self.consumer.commit() directly which lets aiokafka exceptions
(CommitFailedError, IllegalOperation, IllegalStateError, KafkaError) propagate;
change commit() in the adapter to wrap the await in a try/except that catches
Exception (or the specific aiokafka exceptions) and returns the caught exception
instead of raising it, keeping the existing ConsumerNotStartedError return when
consumer is None; apply the same try/except-and-return-exception pattern to the
other adapter method that awaits self.consumer.commit() near line 236 so both
places return Optional[Exception] on failure rather than throwing.

---

Nitpick comments:
In
`@test/components/bot_detector/event_queue/adapter_kafka/test_queue_adapter_kafka.py`:
- Around line 62-69: The KafkaConfig construction is duplicated across tests
(e.g., the KafkaConfig(...) block used in test_queue_adapter_kafka.py around the
config variable); extract that repeated setup into a single reusable fixture or
helper function (e.g., a pytest fixture named kafka_config or a helper
build_kafka_config()) and replace the inline KafkaConfig instantiations with
references to that fixture/helper; ensure the fixture returns a KafkaConfig
instance with the same fields (topic, bootstrap_servers, consumer/producer
flags, consumer_config, producer_config) so all tests (including the other
occurrence at lines 82-89) use the centralized config and remain consistent.

ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7099fbc and 345af83.

📒 Files selected for processing (9)
  • bases/bot_detector/hiscore_scraper/core.py
  • bases/bot_detector/runemetrics_scraper/core.py
  • bases/bot_detector/worker_hiscore/core.py
  • bases/bot_detector/worker_ml/core.py
  • bases/bot_detector/worker_report/main.py
  • components/bot_detector/event_queue/adapters/kafka/adapter.py
  • components/bot_detector/event_queue/core/event_queue.py
  • test/components/bot_detector/event_queue/adapter_kafka/test_consumer_adapter_kafka.py
  • test/components/bot_detector/event_queue/adapter_kafka/test_queue_adapter_kafka.py

Comment on lines +242 to +246
commit_error = await player_nf_queue.commit()
if commit_error:
logger.error(
f"[{worker_id}]: Failed to commit requeued player offset: {commit_error}"
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add backoff when commit fails in this branch.

At Line 242–Line 246, commit failures only log and then immediately re-enter the loop, which can cause tight retry churn on the same record.

💡 Suggested fix
                 else:
                     commit_error = await player_nf_queue.commit()
                     if commit_error:
                         logger.error(
                             f"[{worker_id}]: Failed to commit requeued player offset: {commit_error}"
                         )
+                        await asyncio.sleep(10)
                 continue
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@bases/bot_detector/runemetrics_scraper/core.py` around lines 242 - 246, The
commit error branch for player_nf_queue.commit() currently only logs and
immediately continues, which can cause tight retry loops; modify the branch
handling the result of player_nf_queue.commit() (the block that checks
commit_error and logs with logger.error and worker_id) to perform an exponential
backoff (e.g., async sleep) before retrying, starting with a small delay and
capping the maximum delay, and reset the backoff on a successful commit;
implement this using asyncio.sleep (or the existing async sleep helper) in that
branch so the loop yields and backs off on repeated commit failures.

Comment on lines +146 to +149
put_result = await report_queue.put(message=[report])
if isinstance(put_result, Exception):
logger.error(f"Failed to requeue report from error queue: {put_result}")
await asyncio.sleep(5)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Requeue failure here can drop reports from the in-memory error queue.

At Line 146, the item is already removed from error_queue; if Kafka put fails (Line 147), it is only logged/slept and effectively lost. Reinsert it for retry.

💡 Suggested fix
         put_result = await report_queue.put(message=[report])
         if isinstance(put_result, Exception):
             logger.error(f"Failed to requeue report from error queue: {put_result}")
+            await error_queue.put(report)
             await asyncio.sleep(5)
+            continue
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
put_result = await report_queue.put(message=[report])
if isinstance(put_result, Exception):
logger.error(f"Failed to requeue report from error queue: {put_result}")
await asyncio.sleep(5)
put_result = await report_queue.put(message=[report])
if isinstance(put_result, Exception):
logger.error(f"Failed to requeue report from error queue: {put_result}")
await error_queue.put(report)
await asyncio.sleep(5)
continue
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@bases/bot_detector/worker_report/main.py` around lines 146 - 149, The requeue
logic currently drops a report when report_queue.put fails because the item has
already been removed from error_queue; update the failure branch so that if
put_result is an Exception you reinsert the same report back into the in-memory
error queue (e.g., await error_queue.put(report)) before logging/sleeping;
locate the code around report_queue.put(...) and the variable names report,
error_queue, report_queue and logger in worker_report/main.py and add an await
error_queue.put(report) (or equivalent retry-safe enqueue) inside the
isinstance(put_result, Exception) branch.

@extreme4all extreme4all merged commit 6d9185d into develop Mar 3, 2026
33 checks passed
@extreme4all extreme4all deleted the codex/update-aiokafkaadapter-methods-and-tests branch March 3, 2026 22:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant