From f148b31b61b703acd431db8043778d667729cf66 Mon Sep 17 00:00:00 2001 From: Mattbusel Date: Wed, 11 Mar 2026 05:20:45 -0400 Subject: [PATCH 1/3] feat(service): add CircuitBreaker middleware MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit gRPC channels can fail silently — a downstream that is overwhelmed or crashing will keep accepting connections and returning errors rather than refusing them. Without a circuit breaker, callers retry into a failing service, wasting resources and extending outages. Add tonic::service::circuit_breaker::{CircuitBreaker, CircuitBreakerLayer}: - Three-state machine: Closed → Open → HalfOpen - Closed: requests flow through normally - Open: requests are immediately rejected with Status::unavailable ("circuit breaker is open") until `timeout` elapses - HalfOpen: one probe request is allowed; success above `success_threshold` closes the circuit, any failure reopens it The implementation is pure Tower middleware — no async runtime dependency in poll_ready/call, state guarded by std::sync::Mutex for zero overhead. Usage: let channel = ServiceBuilder::new() .layer(CircuitBreakerLayer::new(5, 0.6, Duration::from_secs(30))) .service(channel); let mut client = MyClient::new(channel); Uses pin_project and tower_layer/tower_service already in [dependencies]. --- tonic/src/service/circuit_breaker.rs | 317 +++++++++++++++++++++++++++ tonic/src/service/mod.rs | 3 + 2 files changed, 320 insertions(+) create mode 100644 tonic/src/service/circuit_breaker.rs diff --git a/tonic/src/service/circuit_breaker.rs b/tonic/src/service/circuit_breaker.rs new file mode 100644 index 000000000..547416205 --- /dev/null +++ b/tonic/src/service/circuit_breaker.rs @@ -0,0 +1,317 @@ +//! Circuit-breaker middleware for tonic services. +//! +//! Wraps any Tower [`Service`] and prevents calls to a struggling downstream +//! when too many consecutive failures have been observed, returning +//! [`Status::unavailable`] immediately until the service shows signs of +//! recovery. +//! +//! # State machine +//! +//! ```text +//! ┌────────┐ consecutive_failures >= threshold ┌──────┐ +//! │ Closed │ ─────────────────────────────────► │ Open │ +//! └────────┘ └──────┘ +//! ▲ │ +//! │ success_rate >= success_threshold │ timeout elapsed +//! │ ▼ +//! └────────────────────────────────── ┌──────────┐ +//! │ HalfOpen │ +//! └──────────┘ +//! ``` +//! +//! # Example +//! +//! ```rust,ignore +//! use tonic::service::circuit_breaker::CircuitBreakerLayer; +//! use tower::ServiceBuilder; +//! use std::time::Duration; +//! +//! let channel = tonic::transport::Channel::from_static("http://[::1]:50051") +//! .connect() +//! .await?; +//! +//! let channel = ServiceBuilder::new() +//! .layer(CircuitBreakerLayer::new(5, 0.6, Duration::from_secs(30))) +//! .service(channel); +//! +//! let mut client = MyServiceClient::new(channel); +//! ``` + +use std::{ + fmt, + future::Future, + pin::Pin, + sync::{Arc, Mutex}, + task::{Context, Poll}, + time::{Duration, Instant}, +}; + +use pin_project::pin_project; +use tower_layer::Layer; +use tower_service::Service; + +use crate::Status; + +// ── State machine ───────────────────────────────────────────────────────────── + +#[derive(Debug, Clone, PartialEq, Eq)] +enum CircuitState { + /// Normal operation — requests flow through. + Closed, + /// Too many failures — requests are rejected with `Status::unavailable`. + Open, + /// One probe request allowed through to test recovery. + HalfOpen, +} + +#[derive(Debug)] +struct State { + status: CircuitState, + consecutive_failures: usize, + last_failure: Option, + last_transition: Instant, + /// Sliding window of outcomes: `true` = success, `false` = failure. + window: Vec, +} + +impl State { + fn new() -> Self { + Self { + status: CircuitState::Closed, + consecutive_failures: 0, + last_failure: None, + last_transition: Instant::now(), + window: Vec::with_capacity(100), + } + } + + fn push(&mut self, success: bool) { + self.window.push(success); + if self.window.len() > 100 { + self.window.remove(0); + } + } + + fn success_rate(&self) -> f64 { + if self.window.is_empty() { + return 0.0; + } + self.window.iter().filter(|&&v| v).count() as f64 / self.window.len() as f64 + } +} + +// ── Layer ───────────────────────────────────────────────────────────────────── + +/// [`Layer`] that applies [`CircuitBreaker`] middleware. +/// +/// [`Layer`]: tower_layer::Layer +#[derive(Clone, Debug)] +pub struct CircuitBreakerLayer { + failure_threshold: usize, + success_threshold: f64, + timeout: Duration, +} + +impl CircuitBreakerLayer { + /// Create a new [`CircuitBreakerLayer`]. + /// + /// - `failure_threshold`: consecutive failures before opening the circuit. + /// - `success_threshold`: fraction of successes in the sliding window required to close + /// the circuit from [`HalfOpen`] (e.g. `0.6` means 60%). + /// - `timeout`: how long to wait in [`Open`] state before probing with a single request. + /// + /// [`HalfOpen`]: CircuitState::HalfOpen + /// [`Open`]: CircuitState::Open + pub fn new(failure_threshold: usize, success_threshold: f64, timeout: Duration) -> Self { + Self { + failure_threshold, + success_threshold, + timeout, + } + } +} + +impl Layer for CircuitBreakerLayer { + type Service = CircuitBreaker; + + fn layer(&self, inner: S) -> Self::Service { + CircuitBreaker::new( + inner, + self.failure_threshold, + self.success_threshold, + self.timeout, + ) + } +} + +// ── Service ─────────────────────────────────────────────────────────────────── + +/// Circuit-breaker middleware for tonic services. +/// +/// See the [module documentation](self) for a usage example. +#[derive(Clone)] +pub struct CircuitBreaker { + inner: S, + state: Arc>, + failure_threshold: usize, + success_threshold: f64, + timeout: Duration, +} + +impl fmt::Debug for CircuitBreaker { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("CircuitBreaker") + .field("inner", &self.inner) + .field("failure_threshold", &self.failure_threshold) + .field("success_threshold", &self.success_threshold) + .field("timeout", &self.timeout) + .finish() + } +} + +impl CircuitBreaker { + /// Wrap `inner` with circuit-breaker protection. + pub fn new( + inner: S, + failure_threshold: usize, + success_threshold: f64, + timeout: Duration, + ) -> Self { + Self { + inner, + state: Arc::new(Mutex::new(State::new())), + failure_threshold, + success_threshold, + timeout, + } + } +} + +impl Service for CircuitBreaker +where + S: Service + Clone + Send + 'static, + S::Future: Send + 'static, + S::Error: Into + Send + 'static, + S::Response: Send + 'static, + Req: Send + 'static, +{ + type Response = S::Response; + type Error = crate::BoxError; + type Future = CircuitBreakerFuture; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + // Gate: check circuit state before advertising readiness. + { + let mut s = self.state.lock().unwrap(); + match s.status { + CircuitState::Open => { + let elapsed = s + .last_failure + .map(|t| t.elapsed()) + .unwrap_or(Duration::ZERO); + + if elapsed < self.timeout { + return Poll::Ready(Err( + Status::unavailable("circuit breaker is open").into() + )); + } + + // Timeout elapsed — probe with a single request. + s.status = CircuitState::HalfOpen; + s.window.clear(); + s.consecutive_failures = 0; + s.last_transition = Instant::now(); + } + CircuitState::Closed | CircuitState::HalfOpen => {} + } + } + + self.inner.poll_ready(cx).map_err(Into::into) + } + + fn call(&mut self, req: Req) -> Self::Future { + let state = self.state.clone(); + let failure_threshold = self.failure_threshold; + let success_threshold = self.success_threshold; + + let mut inner = self.inner.clone(); + std::mem::swap(&mut inner, &mut self.inner); + + CircuitBreakerFuture { + inner: inner.call(req), + state, + failure_threshold, + success_threshold, + _marker: std::marker::PhantomData, + } + } +} + +// ── Future ──────────────────────────────────────────────────────────────────── + +/// Response future for [`CircuitBreaker`]. +#[pin_project] +pub struct CircuitBreakerFuture { + #[pin] + inner: F, + state: Arc>, + failure_threshold: usize, + success_threshold: f64, + _marker: std::marker::PhantomData, +} + +impl Future for CircuitBreakerFuture +where + F: Future>, + E: Into, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let failure_threshold = *this.failure_threshold; + let success_threshold = *this.success_threshold; + + match this.inner.poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(Ok(resp)) => { + let mut s = this.state.lock().unwrap(); + s.push(true); + match s.status { + CircuitState::HalfOpen => { + if s.success_rate() >= success_threshold { + s.status = CircuitState::Closed; + s.consecutive_failures = 0; + s.last_transition = Instant::now(); + } + } + CircuitState::Closed => { + s.consecutive_failures = 0; + } + CircuitState::Open => {} + } + Poll::Ready(Ok(resp)) + } + Poll::Ready(Err(e)) => { + let mut s = this.state.lock().unwrap(); + s.push(false); + s.consecutive_failures += 1; + s.last_failure = Some(Instant::now()); + match s.status { + CircuitState::Closed => { + if s.consecutive_failures >= failure_threshold { + s.status = CircuitState::Open; + s.last_transition = Instant::now(); + } + } + CircuitState::HalfOpen => { + s.status = CircuitState::Open; + s.last_transition = Instant::now(); + } + CircuitState::Open => {} + } + Poll::Ready(Err(e.into())) + } + } + } +} diff --git a/tonic/src/service/mod.rs b/tonic/src/service/mod.rs index c918c0b11..412ee93d9 100644 --- a/tonic/src/service/mod.rs +++ b/tonic/src/service/mod.rs @@ -1,10 +1,13 @@ //! Utilities for using Tower services with Tonic. +pub mod circuit_breaker; pub mod interceptor; pub(crate) mod layered; #[cfg(feature = "router")] pub(crate) mod router; +#[doc(inline)] +pub use self::circuit_breaker::{CircuitBreaker, CircuitBreakerLayer}; #[doc(inline)] pub use self::interceptor::{Interceptor, InterceptorLayer}; pub use self::layered::{LayerExt, Layered}; From 48bcaea9f53c952e2f5f0ec68ff79f0582edbc3f Mon Sep 17 00:00:00 2001 From: Mattbusel Date: Wed, 11 Mar 2026 05:33:11 -0400 Subject: [PATCH 2/3] fix: add Debug impl for CircuitBreakerFuture --- tonic/src/service/circuit_breaker.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tonic/src/service/circuit_breaker.rs b/tonic/src/service/circuit_breaker.rs index 547416205..db380de9a 100644 --- a/tonic/src/service/circuit_breaker.rs +++ b/tonic/src/service/circuit_breaker.rs @@ -260,6 +260,12 @@ pub struct CircuitBreakerFuture { _marker: std::marker::PhantomData, } +impl fmt::Debug for CircuitBreakerFuture { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("CircuitBreakerFuture").finish() + } +} + impl Future for CircuitBreakerFuture where F: Future>, From 6a888899724368d5132e2dd58be43b85d74d6d52 Mon Sep 17 00:00:00 2001 From: Mattbusel Date: Wed, 11 Mar 2026 05:44:05 -0400 Subject: [PATCH 3/3] fix(docs): remove links to private CircuitState variants --- tonic/src/service/circuit_breaker.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/tonic/src/service/circuit_breaker.rs b/tonic/src/service/circuit_breaker.rs index db380de9a..01f0c0b12 100644 --- a/tonic/src/service/circuit_breaker.rs +++ b/tonic/src/service/circuit_breaker.rs @@ -117,11 +117,8 @@ impl CircuitBreakerLayer { /// /// - `failure_threshold`: consecutive failures before opening the circuit. /// - `success_threshold`: fraction of successes in the sliding window required to close - /// the circuit from [`HalfOpen`] (e.g. `0.6` means 60%). - /// - `timeout`: how long to wait in [`Open`] state before probing with a single request. - /// - /// [`HalfOpen`]: CircuitState::HalfOpen - /// [`Open`]: CircuitState::Open + /// the circuit from `HalfOpen` state (e.g. `0.6` means 60%). + /// - `timeout`: how long to wait in `Open` state before allowing a single probe request. pub fn new(failure_threshold: usize, success_threshold: f64, timeout: Duration) -> Self { Self { failure_threshold,