-
Notifications
You must be signed in to change notification settings - Fork 2
Propagate Kafka queue put/commit errors to callers #94
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -128,7 +128,10 @@ async def consume_many_task( | |||||||||||||||||||||
| logger.debug(f"Traceback: \n{traceback.format_exc()}") | ||||||||||||||||||||||
| await asyncio.sleep(5) | ||||||||||||||||||||||
| if should_commit: | ||||||||||||||||||||||
| await report_queue.commit() | ||||||||||||||||||||||
| commit_result = await report_queue.commit() | ||||||||||||||||||||||
| if isinstance(commit_result, Exception): | ||||||||||||||||||||||
| logger.error(f"Failed to commit processed reports: {commit_result}") | ||||||||||||||||||||||
| await asyncio.sleep(5) | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
|
|
||||||||||||||||||||||
| async def error_task( | ||||||||||||||||||||||
|
|
@@ -140,7 +143,10 @@ async def error_task( | |||||||||||||||||||||
| if not isinstance(report, ReportsToInsertStruct): | ||||||||||||||||||||||
| logger.warning(f"invalid {report=}") | ||||||||||||||||||||||
| continue | ||||||||||||||||||||||
| await report_queue.put(message=[report]) | ||||||||||||||||||||||
| put_result = await report_queue.put(message=[report]) | ||||||||||||||||||||||
| if isinstance(put_result, Exception): | ||||||||||||||||||||||
| logger.error(f"Failed to requeue report from error queue: {put_result}") | ||||||||||||||||||||||
| await asyncio.sleep(5) | ||||||||||||||||||||||
|
Comment on lines
+146
to
+149
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Requeue failure here can drop reports from the in-memory error queue. At Line 146, the item is already removed from 💡 Suggested fix put_result = await report_queue.put(message=[report])
if isinstance(put_result, Exception):
logger.error(f"Failed to requeue report from error queue: {put_result}")
+ await error_queue.put(report)
await asyncio.sleep(5)
+ continue📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||
|
|
||||||||||||||||||||||
|
|
||||||||||||||||||||||
| async def main(): | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add backoff when commit fails in this branch.
At Line 242–Line 246, commit failures only log and then immediately re-enter the loop, which can cause tight retry churn on the same record.
💡 Suggested fix
else: commit_error = await player_nf_queue.commit() if commit_error: logger.error( f"[{worker_id}]: Failed to commit requeued player offset: {commit_error}" ) + await asyncio.sleep(10) continue🤖 Prompt for AI Agents