Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,12 @@ pub struct Config {
/// Maximum time in milliseconds for a single push RPC to the worker service. This should be greater than the worker's internal timeout.
pub push_timeout_ms: u64,

/// The size of a batch of status updates. Only active in push mode.
pub status_flush_batch_size: usize,

/// Maximum milliseconds to wait before flushing a batch of status updates.
pub status_flush_interval_ms: u64,

/// The hostname used to construct `callback_url` for task push requests.
pub callback_addr: String,

Expand Down Expand Up @@ -383,6 +389,8 @@ impl Default for Config {
push_queue_size: 1,
push_queue_timeout_ms: 5000,
push_timeout_ms: 30000,
status_flush_batch_size: 1,
status_flush_interval_ms: 100,
callback_addr: "0.0.0.0".into(),
callback_port: 50051,
worker_map: [("sentry".into(), "http://127.0.0.1:50052".into())].into(),
Expand Down
10 changes: 8 additions & 2 deletions src/fetch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,19 @@ impl<T: TaskPusher + Send + Sync + 'static> FetchPool<T> {
}

_ = async {
let start = Instant::now();

debug!("Fetching next batch of pending activations...");
metrics::counter!("fetch.loop.count").increment(1);

let start = Instant::now();
let mut backoff = false;

match store.claim_activations_for_push(limit, bucket).await {
let start_claim = Instant::now();
let result = store.claim_activations_for_push(limit, bucket).await;
metrics::histogram!("fetch.claim_activations_for_push.duration")
.record(start_claim.elapsed());

match result {
Ok(activations) if activations.is_empty() => {
metrics::counter!("fetch.empty").increment(1);
debug!("No pending activations");
Expand Down
12 changes: 12 additions & 0 deletions src/fetch/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,18 @@ impl InflightActivationStore for MockStore {
unimplemented!()
}

async fn set_status_batch(
&self,
_ids: &[String],
_status: InflightActivationStatus,
) -> Result<(), Error> {
Ok(())
}

async fn delete_activation_batch(&self, _ids: &[String]) -> Result<u64, Error> {
Ok(0)
}

async fn set_processing_deadline(
&self,
_id: &str,
Expand Down
62 changes: 62 additions & 0 deletions src/flusher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use std::future::Future;
use std::pin::Pin;
use std::time::Duration;

use anyhow::Result;
use tokio::sync::mpsc::Receiver;

/// Run flusher that receives values of type T from a channel and flushes
/// them using the provided async `flush` function either when the batch is
/// full or when the max flush interval has elapsed.
pub async fn run_flusher<T, F>(
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created this function because I'm also planning to batch claimed → processing updates in the push pool, which will use basically identical machinery.

mut rx: Receiver<T>,
batch_size: usize,
interval_ms: u64,
mut flush: F,
) -> Result<()>
where
F: for<'a> FnMut(&'a mut Vec<T>) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
{
let batch_size = batch_size.max(1);
let interval_ms = interval_ms.max(1);

let period = Duration::from_millis(interval_ms);
let mut interval = tokio::time::interval(period);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

let mut buffer: Vec<T> = Vec::with_capacity(batch_size);

loop {
tokio::select! {
msg = rx.recv() => {
match msg {
Some(v) => {
buffer.push(v);

while let Ok(update) = rx.try_recv() {
buffer.push(update);
}

if buffer.len() >= batch_size {
flush(&mut buffer).await;
}
}

None => {
// Channel closed (shutdown), flush remaining and exit
flush(&mut buffer).await;
break;
}
}
}

_ = interval.tick() => {
if !buffer.is_empty() {
flush(&mut buffer).await;
}
}
}
}

Ok(())
}
2 changes: 2 additions & 0 deletions src/grpc/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
pub mod auth_middleware;
pub mod metrics_middleware;
pub mod server;
pub mod status_flusher;

#[cfg(test)]
mod server_tests;
71 changes: 70 additions & 1 deletion src/grpc/server.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;

use anyhow::Result;
use chrono::Utc;
use prost::Message;
use sentry_protos::taskbroker::v1::consumer_service_server::ConsumerService;
use sentry_protos::taskbroker::v1::{
FetchNextTask, GetTaskRequest, GetTaskResponse, SetTaskStatusRequest, SetTaskStatusResponse,
TaskActivation, TaskActivationStatus,
};
use tokio::sync::mpsc;
use tonic::{Request, Response, Status};
use tracing::{error, instrument, warn};
use tracing::{debug, error, instrument, warn};

use crate::config::{Config, DeliveryMode};
use crate::store::activation::InflightActivationStatus;
Expand All @@ -18,6 +21,7 @@ use crate::store::traits::InflightActivationStore;
pub struct TaskbrokerServer {
pub store: Arc<dyn InflightActivationStore>,
pub config: Arc<Config>,
pub status_tx: Option<mpsc::Sender<StatusUpdate>>,
}

#[tonic::async_trait]
Expand Down Expand Up @@ -97,10 +101,20 @@ impl ConsumerService for TaskbrokerServer {
"Invalid status, expects 3 (Failure), 4 (Retry), or 5 (Complete), but got: {status:?}"
)));
}

if status == InflightActivationStatus::Failure {
metrics::counter!("grpc_server.set_status.failure").increment(1);
}

if let Some(ref tx) = self.status_tx {
tx.send((id, status))
.await
.map_err(|_| Status::internal("Status update channel closed"))?;

metrics::histogram!("grpc_server.set_status.duration").record(start_time.elapsed());
return Ok(Response::new(SetTaskStatusResponse { task: None }));
}

match self.store.set_status(&id, status).await {
Ok(Some(_)) => metrics::counter!(
"grpc_server.set_status",
Expand Down Expand Up @@ -194,3 +208,58 @@ impl ConsumerService for TaskbrokerServer {
res
}
}

pub type StatusUpdate = (String, InflightActivationStatus);

pub async fn flush_status_updates(
store: Arc<dyn InflightActivationStore>,
buffer: &mut Vec<StatusUpdate>,
) {
if buffer.is_empty() {
return;
}

let updates = std::mem::take(buffer);
let mut by_status: HashMap<InflightActivationStatus, Vec<String>> = HashMap::new();

for (id, status) in updates {
by_status.entry(status).or_default().push(id);
}

let mut success = 0;
let mut fail = 0;

for (status, ids) in by_status {
let count = ids.len() as u64;

match store
.set_status_batch(&ids, status)
.await
.map(|()| ids.len() as u64)
{
Ok(count) => {
success += count;
debug!(?status, ?count, "Flushed status batch");
}

Err(e) => {
fail += count;

error!(
?status,
?count,
error = ?e,
"Failed to flush status batch"
);

// Push failed updates back into the buffer so they can be retried on next flush
for id in ids {
buffer.push((id, status));
}
}
}
}
Comment on lines +251 to +261
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The implementation doesn't immediately delete completed tasks. It updates their status to Complete, and a separate upkeep task deletes them later, contrary to the PR's goal.
Severity: HIGH

Suggested Fix

In main.rs, replace the use of the generic run_flusher with flush_status_updates with a call to the specific run_status_flusher function. This will ensure that when a task status is Complete, it is immediately deleted from the database as intended.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent. Verify if this is a real issue. If it is, propose a fix; if not, explain why it's
not valid.

Location: src/grpc/server.rs#L232-L261

Potential issue: The implementation does not immediately delete completed tasks as
intended. When a task is marked `Complete`, its status is updated in the database, but
the task record is not deleted. Deletion only occurs later during a periodic upkeep task
(`remove_completed()`). The function `run_status_flusher`, which contains the correct
logic to delete completed tasks immediately, is defined but never called. Instead, the
generic `run_flusher` is used with `flush_status_updates`, which only updates the
status. This defeats the PR's goal of reducing database load by deleting completed tasks
right away.


metrics::gauge!(format!("grpc_server.flush_status_updates.success")).set(success as f64);
metrics::gauge!(format!("grpc_server.flush_status_updates.fail")).set(fail as f64);
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Completed tasks not deleted, only status-updated

High Severity

flush_status_updates in server.rs calls set_status_batch for all statuses, including Complete. The PR goal is to "Delete Completed Tasks Immediately," and the correct implementation exists in status_flusher.rs::flush_buffer, which routes Complete statuses to delete_activation_batch. However, main.rs wires up the wrong function (flush_status_updates), so completed tasks are never deleted — they just get their status set to Complete and remain in the database.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit c96955e. Configure here.

Loading
Loading