Skip to content

feat(taskbroker): Batch Status Updates and Delete Completed Tasks Immediately#618

Open
george-sentry wants to merge 4 commits intomainfrom
george/push-taskbroker/batch-updates
Open

feat(taskbroker): Batch Status Updates and Delete Completed Tasks Immediately#618
george-sentry wants to merge 4 commits intomainfrom
george/push-taskbroker/batch-updates

Conversation

@george-sentry
Copy link
Copy Markdown
Member

@george-sentry george-sentry commented Apr 30, 2026

Linear

Completes STREAM-918

Description

On the usual workload of 100 millisecond tasks, with the new "claimed" status, we can do around 5K tasks per second in the sandbox. By batching status updates, we reduce DB load, making all queries take less time. This can increase throughput by 1K to 2K tasks per second.

@george-sentry george-sentry requested a review from a team as a code owner April 30, 2026 21:02
@linear-code
Copy link
Copy Markdown

linear-code Bot commented Apr 30, 2026

Comment thread src/main.rs
Comment thread src/main.rs Outdated
Comment thread src/main.rs Outdated
Comment thread src/main.rs
Comment thread src/main.rs
@george-sentry george-sentry marked this pull request as draft April 30, 2026 23:24
@george-sentry
Copy link
Copy Markdown
Member Author

Since we may want to treat claimed → processing updates the same way, I'm actually going to create a more general Flusher struct that can be used by both push threads and the gRPC server.

@george-sentry george-sentry marked this pull request as ready for review May 1, 2026 08:02
Comment thread src/grpc/server.rs
Comment on lines +251 to +261
error = ?e,
"Failed to flush status batch"
);

// Push failed updates back into the buffer so they can be retried on next flush
for id in ids {
buffer.push((id, status));
}
}
}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The implementation doesn't immediately delete completed tasks. It updates their status to Complete, and a separate upkeep task deletes them later, contrary to the PR's goal.
Severity: HIGH

Suggested Fix

In main.rs, replace the use of the generic run_flusher with flush_status_updates with a call to the specific run_status_flusher function. This will ensure that when a task status is Complete, it is immediately deleted from the database as intended.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent. Verify if this is a real issue. If it is, propose a fix; if not, explain why it's
not valid.

Location: src/grpc/server.rs#L232-L261

Potential issue: The implementation does not immediately delete completed tasks as
intended. When a task is marked `Complete`, its status is updated in the database, but
the task record is not deleted. Deletion only occurs later during a periodic upkeep task
(`remove_completed()`). The function `run_status_flusher`, which contains the correct
logic to delete completed tasks immediately, is defined but never called. Instead, the
generic `run_flusher` is used with `flush_status_updates`, which only updates the
status. This defeats the PR's goal of reducing database load by deleting completed tasks
right away.

Comment on lines +721 to +729
let placeholders: Vec<String> = (0..ids.len()).map(|i| format!("?{}", i + 2)).collect();
let sql = format!(
"UPDATE inflight_taskactivations SET status = ?1 WHERE id IN ({})",
placeholders.join(", ")
);

let mut q = sqlx::query(&sql).bind(status);
for id in ids {
q = q.bind(id);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The set_status_batch function for SQLite uses incorrect ?N style SQL placeholders instead of the required $N syntax for sqlx, causing status update queries to fail.
Severity: CRITICAL

Suggested Fix

In src/store/adapters/sqlite.rs, modify the set_status_batch function to generate placeholders using the $N syntax (e.g., $2, $3, ...) instead of the ?N syntax. This will align the query with the sqlx SQLite driver's expectations and allow parameter binding to succeed.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent. Verify if this is a real issue. If it is, propose a fix; if not, explain why it's
not valid.

Location: src/store/adapters/sqlite.rs#L721-L729

Potential issue: The `set_status_batch` function for the SQLite adapter constructs a SQL
query using numbered placeholders like `?1`, `?2`. However, the `sqlx` library's SQLite
driver expects `$1`, `$2` style placeholders, which are used correctly in other queries
within the same file. This mismatch will cause the `UPDATE` query to fail at runtime
when using SQLite in push mode, preventing task statuses from being updated in the
database.

Comment thread src/flusher.rs
/// Run flusher that receives values of type T from a channel and flushes
/// them using the provided async `flush` function either when the batch is
/// full or when the max flush interval has elapsed.
pub async fn run_flusher<T, F>(
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created this function because I'm also planning to batch claimed → processing updates in the push pool, which will use basically identical machinery.

Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 2 potential issues.

Fix All in Cursor

❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

Reviewed by Cursor Bugbot for commit c96955e. Configure here.

Comment thread src/grpc/server.rs

metrics::gauge!(format!("grpc_server.flush_status_updates.success")).set(success as f64);
metrics::gauge!(format!("grpc_server.flush_status_updates.fail")).set(fail as f64);
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Completed tasks not deleted, only status-updated

High Severity

flush_status_updates in server.rs calls set_status_batch for all statuses, including Complete. The PR goal is to "Delete Completed Tasks Immediately," and the correct implementation exists in status_flusher.rs::flush_buffer, which routes Complete statuses to delete_activation_batch. However, main.rs wires up the wrong function (flush_status_updates), so completed tasks are never deleted — they just get their status set to Complete and remain in the database.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit c96955e. Configure here.

}

metrics::histogram!("status_flush.flush.duration").record(start.elapsed());
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Entire status_flusher module is unused dead code

Medium Severity

The status_flusher module is declared in grpc/mod.rs and contains run_status_flusher and flush_buffer with the correct delete-on-complete logic, but nothing in the codebase imports or calls any of its exports. main.rs uses the generic flusher::run_flusher combined with flush_status_updates from server.rs instead. This leaves the correct implementation as dead code and a duplicate StatusUpdate type alias defined in both server.rs and status_flusher.rs.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit c96955e. Configure here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant