feat(taskbroker): Batch Status Updates and Delete Completed Tasks Immediately#618
feat(taskbroker): Batch Status Updates and Delete Completed Tasks Immediately#618george-sentry wants to merge 4 commits intomainfrom
Conversation
|
Since we may want to treat claimed → processing updates the same way, I'm actually going to create a more general |
…eorge/push-taskbroker/batch-updates
| error = ?e, | ||
| "Failed to flush status batch" | ||
| ); | ||
|
|
||
| // Push failed updates back into the buffer so they can be retried on next flush | ||
| for id in ids { | ||
| buffer.push((id, status)); | ||
| } | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Bug: The implementation doesn't immediately delete completed tasks. It updates their status to Complete, and a separate upkeep task deletes them later, contrary to the PR's goal.
Severity: HIGH
Suggested Fix
In main.rs, replace the use of the generic run_flusher with flush_status_updates with a call to the specific run_status_flusher function. This will ensure that when a task status is Complete, it is immediately deleted from the database as intended.
Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent. Verify if this is a real issue. If it is, propose a fix; if not, explain why it's
not valid.
Location: src/grpc/server.rs#L232-L261
Potential issue: The implementation does not immediately delete completed tasks as
intended. When a task is marked `Complete`, its status is updated in the database, but
the task record is not deleted. Deletion only occurs later during a periodic upkeep task
(`remove_completed()`). The function `run_status_flusher`, which contains the correct
logic to delete completed tasks immediately, is defined but never called. Instead, the
generic `run_flusher` is used with `flush_status_updates`, which only updates the
status. This defeats the PR's goal of reducing database load by deleting completed tasks
right away.
| let placeholders: Vec<String> = (0..ids.len()).map(|i| format!("?{}", i + 2)).collect(); | ||
| let sql = format!( | ||
| "UPDATE inflight_taskactivations SET status = ?1 WHERE id IN ({})", | ||
| placeholders.join(", ") | ||
| ); | ||
|
|
||
| let mut q = sqlx::query(&sql).bind(status); | ||
| for id in ids { | ||
| q = q.bind(id); |
There was a problem hiding this comment.
Bug: The set_status_batch function for SQLite uses incorrect ?N style SQL placeholders instead of the required $N syntax for sqlx, causing status update queries to fail.
Severity: CRITICAL
Suggested Fix
In src/store/adapters/sqlite.rs, modify the set_status_batch function to generate placeholders using the $N syntax (e.g., $2, $3, ...) instead of the ?N syntax. This will align the query with the sqlx SQLite driver's expectations and allow parameter binding to succeed.
Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent. Verify if this is a real issue. If it is, propose a fix; if not, explain why it's
not valid.
Location: src/store/adapters/sqlite.rs#L721-L729
Potential issue: The `set_status_batch` function for the SQLite adapter constructs a SQL
query using numbered placeholders like `?1`, `?2`. However, the `sqlx` library's SQLite
driver expects `$1`, `$2` style placeholders, which are used correctly in other queries
within the same file. This mismatch will cause the `UPDATE` query to fail at runtime
when using SQLite in push mode, preventing task statuses from being updated in the
database.
| /// Run flusher that receives values of type T from a channel and flushes | ||
| /// them using the provided async `flush` function either when the batch is | ||
| /// full or when the max flush interval has elapsed. | ||
| pub async fn run_flusher<T, F>( |
There was a problem hiding this comment.
I created this function because I'm also planning to batch claimed → processing updates in the push pool, which will use basically identical machinery.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit c96955e. Configure here.
|
|
||
| metrics::gauge!(format!("grpc_server.flush_status_updates.success")).set(success as f64); | ||
| metrics::gauge!(format!("grpc_server.flush_status_updates.fail")).set(fail as f64); | ||
| } |
There was a problem hiding this comment.
Completed tasks not deleted, only status-updated
High Severity
flush_status_updates in server.rs calls set_status_batch for all statuses, including Complete. The PR goal is to "Delete Completed Tasks Immediately," and the correct implementation exists in status_flusher.rs::flush_buffer, which routes Complete statuses to delete_activation_batch. However, main.rs wires up the wrong function (flush_status_updates), so completed tasks are never deleted — they just get their status set to Complete and remain in the database.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit c96955e. Configure here.
| } | ||
|
|
||
| metrics::histogram!("status_flush.flush.duration").record(start.elapsed()); | ||
| } |
There was a problem hiding this comment.
Entire status_flusher module is unused dead code
Medium Severity
The status_flusher module is declared in grpc/mod.rs and contains run_status_flusher and flush_buffer with the correct delete-on-complete logic, but nothing in the codebase imports or calls any of its exports. main.rs uses the generic flusher::run_flusher combined with flush_status_updates from server.rs instead. This leaves the correct implementation as dead code and a duplicate StatusUpdate type alias defined in both server.rs and status_flusher.rs.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit c96955e. Configure here.


Linear
Completes STREAM-918
Description
On the usual workload of 100 millisecond tasks, with the new "claimed" status, we can do around 5K tasks per second in the sandbox. By batching status updates, we reduce DB load, making all queries take less time. This can increase throughput by 1K to 2K tasks per second.