-
-
Notifications
You must be signed in to change notification settings - Fork 6
feat(taskbroker): Batch Status Updates and Delete Completed Tasks Immediately #618
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
e707b7e
b7ef805
a2fab2c
c96955e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,62 @@ | ||
| use std::future::Future; | ||
| use std::pin::Pin; | ||
| use std::time::Duration; | ||
|
|
||
| use anyhow::Result; | ||
| use tokio::sync::mpsc::Receiver; | ||
|
|
||
| /// Run flusher that receives values of type T from a channel and flushes | ||
| /// them using the provided async `flush` function either when the batch is | ||
| /// full or when the max flush interval has elapsed. | ||
| pub async fn run_flusher<T, F>( | ||
| mut rx: Receiver<T>, | ||
| batch_size: usize, | ||
| interval_ms: u64, | ||
| mut flush: F, | ||
| ) -> Result<()> | ||
| where | ||
| F: for<'a> FnMut(&'a mut Vec<T>) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>, | ||
| { | ||
| let batch_size = batch_size.max(1); | ||
| let interval_ms = interval_ms.max(1); | ||
|
|
||
| let period = Duration::from_millis(interval_ms); | ||
| let mut interval = tokio::time::interval(period); | ||
| interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); | ||
|
|
||
| let mut buffer: Vec<T> = Vec::with_capacity(batch_size); | ||
|
|
||
| loop { | ||
| tokio::select! { | ||
| msg = rx.recv() => { | ||
| match msg { | ||
| Some(v) => { | ||
| buffer.push(v); | ||
|
|
||
| while let Ok(update) = rx.try_recv() { | ||
| buffer.push(update); | ||
| } | ||
|
|
||
| if buffer.len() >= batch_size { | ||
| flush(&mut buffer).await; | ||
| } | ||
| } | ||
|
|
||
| None => { | ||
| // Channel closed (shutdown), flush remaining and exit | ||
| flush(&mut buffer).await; | ||
| break; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| _ = interval.tick() => { | ||
| if !buffer.is_empty() { | ||
| flush(&mut buffer).await; | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| Ok(()) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,7 @@ | ||
| pub mod auth_middleware; | ||
| pub mod metrics_middleware; | ||
| pub mod server; | ||
| pub mod status_flusher; | ||
|
|
||
| #[cfg(test)] | ||
| mod server_tests; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,15 +1,18 @@ | ||
| use std::collections::HashMap; | ||
| use std::sync::Arc; | ||
| use std::time::Instant; | ||
|
|
||
| use anyhow::Result; | ||
| use chrono::Utc; | ||
| use prost::Message; | ||
| use sentry_protos::taskbroker::v1::consumer_service_server::ConsumerService; | ||
| use sentry_protos::taskbroker::v1::{ | ||
| FetchNextTask, GetTaskRequest, GetTaskResponse, SetTaskStatusRequest, SetTaskStatusResponse, | ||
| TaskActivation, TaskActivationStatus, | ||
| }; | ||
| use tokio::sync::mpsc; | ||
| use tonic::{Request, Response, Status}; | ||
| use tracing::{error, instrument, warn}; | ||
| use tracing::{debug, error, instrument, warn}; | ||
|
|
||
| use crate::config::{Config, DeliveryMode}; | ||
| use crate::store::activation::InflightActivationStatus; | ||
|
|
@@ -18,6 +21,7 @@ use crate::store::traits::InflightActivationStore; | |
| pub struct TaskbrokerServer { | ||
| pub store: Arc<dyn InflightActivationStore>, | ||
| pub config: Arc<Config>, | ||
| pub status_tx: Option<mpsc::Sender<StatusUpdate>>, | ||
| } | ||
|
|
||
| #[tonic::async_trait] | ||
|
|
@@ -97,10 +101,20 @@ impl ConsumerService for TaskbrokerServer { | |
| "Invalid status, expects 3 (Failure), 4 (Retry), or 5 (Complete), but got: {status:?}" | ||
| ))); | ||
| } | ||
|
|
||
| if status == InflightActivationStatus::Failure { | ||
| metrics::counter!("grpc_server.set_status.failure").increment(1); | ||
| } | ||
|
|
||
| if let Some(ref tx) = self.status_tx { | ||
| tx.send((id, status)) | ||
| .await | ||
| .map_err(|_| Status::internal("Status update channel closed"))?; | ||
|
|
||
| metrics::histogram!("grpc_server.set_status.duration").record(start_time.elapsed()); | ||
| return Ok(Response::new(SetTaskStatusResponse { task: None })); | ||
| } | ||
|
|
||
| match self.store.set_status(&id, status).await { | ||
| Ok(Some(_)) => metrics::counter!( | ||
| "grpc_server.set_status", | ||
|
|
@@ -194,3 +208,58 @@ impl ConsumerService for TaskbrokerServer { | |
| res | ||
| } | ||
| } | ||
|
|
||
| pub type StatusUpdate = (String, InflightActivationStatus); | ||
|
|
||
| pub async fn flush_status_updates( | ||
| store: Arc<dyn InflightActivationStore>, | ||
| buffer: &mut Vec<StatusUpdate>, | ||
| ) { | ||
| if buffer.is_empty() { | ||
| return; | ||
| } | ||
|
|
||
| let updates = std::mem::take(buffer); | ||
| let mut by_status: HashMap<InflightActivationStatus, Vec<String>> = HashMap::new(); | ||
|
|
||
| for (id, status) in updates { | ||
| by_status.entry(status).or_default().push(id); | ||
| } | ||
|
|
||
| let mut success = 0; | ||
| let mut fail = 0; | ||
|
|
||
| for (status, ids) in by_status { | ||
| let count = ids.len() as u64; | ||
|
|
||
| match store | ||
| .set_status_batch(&ids, status) | ||
| .await | ||
| .map(|()| ids.len() as u64) | ||
| { | ||
| Ok(count) => { | ||
| success += count; | ||
| debug!(?status, ?count, "Flushed status batch"); | ||
| } | ||
|
|
||
| Err(e) => { | ||
| fail += count; | ||
|
|
||
| error!( | ||
| ?status, | ||
| ?count, | ||
| error = ?e, | ||
| "Failed to flush status batch" | ||
| ); | ||
|
|
||
| // Push failed updates back into the buffer so they can be retried on next flush | ||
| for id in ids { | ||
| buffer.push((id, status)); | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
Comment on lines
+251
to
+261
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: The implementation doesn't immediately delete completed tasks. It updates their status to Suggested FixIn Prompt for AI Agent |
||
|
|
||
| metrics::gauge!(format!("grpc_server.flush_status_updates.success")).set(success as f64); | ||
| metrics::gauge!(format!("grpc_server.flush_status_updates.fail")).set(fail as f64); | ||
| } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Completed tasks not deleted, only status-updatedHigh Severity
Additional Locations (1)Reviewed by Cursor Bugbot for commit c96955e. Configure here. |
||


There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I created this function because I'm also planning to batch claimed → processing updates in the push pool, which will use basically identical machinery.