Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion crates/taskito-async/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "taskito-async"
version = "0.6.0"
version = "0.7.0"
edition = "2021"

[dependencies]
Expand All @@ -9,3 +9,4 @@ pyo3 = { workspace = true }
tokio = { workspace = true }
crossbeam-channel = { workspace = true }
async-trait = { workspace = true }
log = { workspace = true }
5 changes: 3 additions & 2 deletions crates/taskito-async/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,10 @@ impl WorkerDispatcher for NativeAsyncPool {
&job.queue,
),
) {
eprintln!(
log::error!(
"[taskito] Failed to submit async task {}[{}]: {e}",
job.task_name, job.id
job.task_name,
job.id
);
let _ = result_tx.send(JobResult::Failure {
job_id: job.id.clone(),
Expand Down
6 changes: 3 additions & 3 deletions crates/taskito-async/src/task_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub fn execute_sync_task(
let max_retries = job.max_retries;

let start = std::time::Instant::now();
eprintln!("[taskito] Task {task_name}[{job_id}] received");
log::info!("[taskito] Task {task_name}[{job_id}] received");

let result =
Python::with_gil(|py| -> PyResult<Option<Vec<u8>>> { run_task(py, task_registry, job) });
Expand All @@ -32,7 +32,7 @@ pub fn execute_sync_task(
let job_result = match result {
Ok(result_bytes) => {
let secs = start.elapsed().as_secs_f64();
eprintln!("[taskito] Task {task_name}[{job_id}] succeeded in {secs:.3}s");
log::info!("[taskito] Task {task_name}[{job_id}] succeeded in {secs:.3}s");
JobResult::Success {
job_id,
result: result_bytes,
Expand All @@ -59,7 +59,7 @@ pub fn execute_sync_task(
check_should_retry(py, retry_filters, &task_name, &exc_class_name, &e)
});

eprintln!("[taskito] Task {task_name}[{job_id}] failed: {error_msg}");
log::error!("[taskito] Task {task_name}[{job_id}] failed: {error_msg}");
JobResult::Failure {
job_id,
error: error_msg,
Expand Down
2 changes: 1 addition & 1 deletion crates/taskito-core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "taskito-core"
version = "0.6.0"
version = "0.7.0"
edition = "2021"

[features]
Expand Down
163 changes: 138 additions & 25 deletions crates/taskito-core/src/resilience/circuit_breaker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ pub struct CircuitBreakerConfig {
pub threshold: i32,
pub window_ms: i64,
pub cooldown_ms: i64,
/// Number of probe requests allowed in HalfOpen state (default: 5).
pub half_open_max_probes: i32,
/// Required success rate (0.0–1.0) to close from HalfOpen (default: 0.8 = 80%).
pub half_open_success_rate: f64,
}

/// Circuit breaker manager backed by SQLite.
Expand Down Expand Up @@ -65,10 +69,13 @@ impl CircuitBreaker {
// Check if cooldown has elapsed
let opened = row.opened_at.unwrap_or(0);
if now.saturating_sub(opened) >= row.cooldown_ms {
// Transition to half-open: allow one probe
// Transition to half-open: reset probe counters
let updated = CircuitBreakerRow {
state: CircuitState::HalfOpen as i32,
half_open_at: Some(now),
half_open_probe_count: 0,
half_open_success_count: 0,
half_open_failure_count: 0,
..row
};
self.storage.upsert_circuit_breaker(&updated)?;
Expand All @@ -78,33 +85,106 @@ impl CircuitBreaker {
}
}
CircuitState::HalfOpen => {
// Only one probe at a time — block others
Ok(false)
// Allow up to max_probes concurrent probes
if row.half_open_probe_count < row.half_open_max_probes {
let updated = CircuitBreakerRow {
half_open_probe_count: row.half_open_probe_count + 1,
..row
};
self.storage.upsert_circuit_breaker(&updated)?;
Ok(true)
} else {
// Check for timeout: if probes haven't completed within cooldown, re-open
let half_open_since = row.half_open_at.unwrap_or(0);
if now.saturating_sub(half_open_since) >= row.cooldown_ms {
let updated = CircuitBreakerRow {
state: CircuitState::Open as i32,
opened_at: Some(now),
half_open_at: None,
half_open_probe_count: 0,
half_open_success_count: 0,
half_open_failure_count: 0,
..row
};
self.storage.upsert_circuit_breaker(&updated)?;
}
Ok(false)
}
}
}
}

/// Record a task success. Resets the circuit breaker to closed.
/// Record a task success. In HalfOpen, tracks probes and closes when
/// the success rate threshold is met.
pub fn record_success(&self, task_name: &str) -> Result<()> {
let row = match self.storage.get_circuit_breaker(task_name)? {
Some(r) => r,
None => return Ok(()),
};

let state = CircuitState::from_i32(row.state);
if state == CircuitState::Closed && row.failure_count == 0 {
return Ok(()); // Nothing to do
}

let updated = CircuitBreakerRow {
state: CircuitState::Closed as i32,
failure_count: 0,
opened_at: None,
half_open_at: None,
..row
};
self.storage.upsert_circuit_breaker(&updated)?;
Ok(())
match state {
CircuitState::Closed if row.failure_count == 0 => Ok(()),
CircuitState::HalfOpen => {
let successes = row.half_open_success_count + 1;
let total = successes + row.half_open_failure_count;

if total >= row.half_open_max_probes {
let rate = successes as f64 / total as f64;
if rate >= row.half_open_success_rate {
// Threshold met — close the circuit
let updated = CircuitBreakerRow {
state: CircuitState::Closed as i32,
failure_count: 0,
opened_at: None,
half_open_at: None,
half_open_probe_count: 0,
half_open_success_count: 0,
half_open_failure_count: 0,
..row
};
self.storage.upsert_circuit_breaker(&updated)?;
} else {
// Threshold not met — re-open
let now = now_millis();
let updated = CircuitBreakerRow {
state: CircuitState::Open as i32,
opened_at: Some(now),
half_open_at: None,
half_open_probe_count: 0,
half_open_success_count: 0,
half_open_failure_count: 0,
..row
};
self.storage.upsert_circuit_breaker(&updated)?;
}
} else {
// Still collecting samples
let updated = CircuitBreakerRow {
half_open_success_count: successes,
..row
};
self.storage.upsert_circuit_breaker(&updated)?;
}
Ok(())
}
_ => {
// Closed with failures or Open — reset to clean Closed
let updated = CircuitBreakerRow {
state: CircuitState::Closed as i32,
failure_count: 0,
opened_at: None,
half_open_at: None,
half_open_probe_count: 0,
half_open_success_count: 0,
half_open_failure_count: 0,
..row
};
self.storage.upsert_circuit_breaker(&updated)?;
Ok(())
}
}
}

/// Record a task failure. May trip the breaker open.
Expand All @@ -120,16 +200,44 @@ impl CircuitBreaker {

match state {
CircuitState::HalfOpen => {
// Probe failed — go back to open
let updated = CircuitBreakerRow {
state: CircuitState::Open as i32,
failure_count: row.failure_count.saturating_add(1),
last_failure_at: Some(now),
opened_at: Some(now),
half_open_at: None,
..row
let failures = row.half_open_failure_count + 1;
let total = row.half_open_success_count + failures;

// Check if the success rate can still be met
let remaining = row.half_open_max_probes - total;
let best_possible_rate = if row.half_open_max_probes > 0 {
(row.half_open_success_count + remaining) as f64
/ row.half_open_max_probes as f64
} else {
0.0
};
self.storage.upsert_circuit_breaker(&updated)?;

if total >= row.half_open_max_probes
|| best_possible_rate < row.half_open_success_rate
{
// Either all samples collected and failed, or impossible to meet threshold
let updated = CircuitBreakerRow {
state: CircuitState::Open as i32,
failure_count: row.failure_count.saturating_add(1),
last_failure_at: Some(now),
opened_at: Some(now),
half_open_at: None,
half_open_probe_count: 0,
half_open_success_count: 0,
half_open_failure_count: 0,
..row
};
self.storage.upsert_circuit_breaker(&updated)?;
} else {
// Still collecting samples
let updated = CircuitBreakerRow {
failure_count: row.failure_count.saturating_add(1),
last_failure_at: Some(now),
half_open_failure_count: failures,
..row
};
self.storage.upsert_circuit_breaker(&updated)?;
}
}
CircuitState::Closed => {
// Reset count if outside the window
Expand Down Expand Up @@ -193,6 +301,11 @@ impl CircuitBreaker {
threshold: config.threshold,
window_ms: config.window_ms,
cooldown_ms: config.cooldown_ms,
half_open_max_probes: config.half_open_max_probes,
half_open_success_rate: config.half_open_success_rate,
half_open_probe_count: 0,
half_open_success_count: 0,
half_open_failure_count: 0,
};
self.storage.upsert_circuit_breaker(&row)?;
Ok(())
Expand Down
10 changes: 9 additions & 1 deletion crates/taskito-core/src/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ pub enum ResultOutcome {
Retry {
job_id: String,
task_name: String,
queue: String,
error: String,
retry_count: i32,
timed_out: bool,
Expand All @@ -87,11 +88,16 @@ pub enum ResultOutcome {
DeadLettered {
job_id: String,
task_name: String,
queue: String,
error: String,
timed_out: bool,
},
/// Task was cancelled during execution.
Cancelled { job_id: String, task_name: String },
Cancelled {
job_id: String,
task_name: String,
queue: String,
},
}

/// Per-task configuration for retry, rate limiting, and circuit breaker.
Expand Down Expand Up @@ -452,6 +458,8 @@ mod tests {
threshold: 1,
window_ms: 60_000,
cooldown_ms: 300_000,
half_open_max_probes: 5,
half_open_success_rate: 0.8,
};
scheduler.register_task(
"cb_task".to_string(),
Expand Down
23 changes: 22 additions & 1 deletion crates/taskito-core/src/scheduler/result_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ impl Scheduler {
log::error!("circuit breaker error for {task_name}: {e}");
}

// Look up the job to get the queue name for middleware context
let queue = self
.storage
.get_job(&job_id)?
.as_ref()
.map(|j| j.queue.clone())
.unwrap_or_default();

// If should_retry is false (exception filtering), skip straight to DLQ
if !should_retry {
match self.storage.get_job(&job_id)? {
Expand All @@ -80,6 +88,7 @@ impl Scheduler {
return Ok(ResultOutcome::DeadLettered {
job_id,
task_name,
queue,
error,
timed_out,
});
Expand All @@ -103,6 +112,7 @@ impl Scheduler {
Ok(ResultOutcome::Retry {
job_id,
task_name,
queue,
error,
retry_count,
timed_out,
Expand All @@ -116,6 +126,7 @@ impl Scheduler {
Ok(ResultOutcome::DeadLettered {
job_id,
task_name,
queue,
error,
timed_out,
})
Expand All @@ -140,7 +151,17 @@ impl Scheduler {
{
error!("failed to record metric for cancelled job {job_id}: {e}");
}
Ok(ResultOutcome::Cancelled { job_id, task_name })
let queue = self
.storage
.get_job(&job_id)?
.as_ref()
.map(|j| j.queue.clone())
.unwrap_or_default();
Ok(ResultOutcome::Cancelled {
job_id,
task_name,
queue,
})
}
}
}
Expand Down
18 changes: 18 additions & 0 deletions crates/taskito-core/src/storage/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,24 @@ pub struct CircuitBreakerRow {
pub threshold: i32,
pub window_ms: i64,
pub cooldown_ms: i64,
#[serde(default = "default_max_probes")]
pub half_open_max_probes: i32,
#[serde(default = "default_success_rate")]
pub half_open_success_rate: f64,
#[serde(default)]
pub half_open_probe_count: i32,
#[serde(default)]
pub half_open_success_count: i32,
#[serde(default)]
pub half_open_failure_count: i32,
}

fn default_max_probes() -> i32 {
5
}

fn default_success_rate() -> f64 {
0.8
}

// ── Workers ──────────────────────────────────────────────────────
Expand Down
Loading
Loading