Skip to content

feat(tonic-xds): implement gRFC A50 outlier detection (failure-percentage) + LoadBalancer integration#2619

Open
LYZJU2019 wants to merge 35 commits into
grpc:masterfrom
LYZJU2019:lyzju2019/a50-outlier-detector
Open

feat(tonic-xds): implement gRFC A50 outlier detection (failure-percentage) + LoadBalancer integration#2619
LYZJU2019 wants to merge 35 commits into
grpc:masterfrom
LYZJU2019:lyzju2019/a50-outlier-detector

Conversation

@LYZJU2019
Copy link
Copy Markdown
Contributor

@LYZJU2019 LYZJU2019 commented Apr 30, 2026

Summary

Implements gRFC A50: xDS Outlier Detection in tonic-xds — the failure-percentage algorithm, end-to-end and integrated with the load balancer. The config types landed previously in #2604; this PR adds the runtime detection logic and wires it into LoadBalancer.

Architecture

Three sites collaborate:

  • Data path (OutlierStatsRegistry::record_outcome, called from LoadBalancer::call) — bumps the picked channel's success/failure counter and evaluates the failure-percentage gates. On the not-ejected → ejected transition, sends the address through an mpsc channel.
  • Load balancer — drains the eject mpsc in poll_ready, consumes the matching ReadyChannel via ReadyChannel::eject, and tracks the resulting EjectedChannel in a KeyedFutures<_, UnejectedChannel<_>> (mirroring the existing pattern for ConnectingChannel). The picker only sees ready: IndexMap<_, ReadyChannel<_>>, so ejected channels are unpickable by construction.
  • Housekeeping actor (spawn_actor) — on each config.interval tick, resets counters and decrements multipliers for non-ejected channels. Never ejects or un-ejects.

Un-ejection is timer-driven: each EjectedChannel's Sleep fires at base × multiplier (capped by max_ejection_time) and yields UnejectedChannel::Ready(svc); the LB routes the resolved channel back to ready.

What's in this PR

  • OutlierChannelState — per-endpoint atomics: success/failure counters, is_qualifying, ejection_multiplier, ejected_at_nanos. Owned via Arc. try_eject / try_uneject use CAS so registry-level counters are updated exactly once per transition.
  • OutlierStatsRegistryDashMap of states + cluster-wide qualifying_count / ejected_count + eject-signal mpsc. add_channel is idempotent (state survives reconnect). note_uneject and remaining_ejection are LB-facing helpers.
  • OutlierDetector — LB-side handle: registry, eject-signal receiver, actor AbortOnDrop. Returns RegistryAlreadyWired if a registry is wired to two LBs (the receiver is one-shot).
  • LoadBalancer integration — poll_ready drives, in order: poll_unejectionpoll_connectingpoll_eject_requests. Change::Insert preserves outlier state across discovery flaps (matching grpc-go and Envoy); Change::Remove purges it.
  • Channel state machine (channel_state.rs) — Idle → Connecting → Ready ↔ Ejected is now backed by the type-state types this PR exercises (ReadyChannel::ejectEjectedChannel: Future<Output = UnejectedChannel>).

A50 compliance

  • Failure-percentage uses strict > against the threshold.
  • Multiplier decrements at the same transition that un-ejects (gRFC step 6.b) — applied inside note_uneject so the timer-driven un-ejection matches Envoy semantics.
  • Ejection duration is min(base × multiplier, max(base, max_ejection_time)).
  • minimum_hosts and max_ejection_percent gated per-RPC.
  • enforcing_failure_percentage honoured via probability roll (Rng::pct_roll).
  • Outlier state survives Insert for an already-tracked address (counters, multiplier, ejection flag all preserved).

What's deferred

  • Success-rate algorithm. OutlierDetectionConfig::success_rate, if set, is ignored (documented in the module docstring).
  • Wiring from ClusterResource into LB construction. Separate follow-up.

Test coverage

  • Algorithm: counters/snapshot, registry add/remove, failure-percentage threshold (above / below / boundary), minimum_hosts and request_volume gates, enforcing_failure_percentage = 0 never ejects, max_ejection_percent cap, multiplier decrement on healthy intervals, mpsc dispatch, A50 step 6.b decrement-on-uneject.
  • remaining_ejection / note_uneject: full-duration, cap, mid-eject remainder, past-deadline, not-ejected.
  • LB integration: failing endpoint gets ejected, healthy cluster stays unejected, endpoint removal cleans the registry, reinsert preserves counters, reinsert while ejected stays ejected, timer-driven un-ejection (via tokio::time::advance), registry can't be wired twice.

Test plan

  • cargo test -p tonic-xds --all-features — 260 lib tests + doctests pass
  • cargo fmt -p tonic-xds --check
  • cargo clippy -p tonic-xds --lib --tests --all-features -- -D warnings clean on changed files (pre-existing warnings in xds/resource/* are unrelated)

Implement the core gRFC A50 outlier-detection algorithm: per-endpoint
success/failure counters, the success-rate and failure-percentage
ejection algorithms, the ejection-multiplier state machine, and a
periodic sweep task that emits ejection/un-ejection decisions on a
channel.

`run_sweep` is pure (returns a Vec<EjectionDecision>); the sweep loop
spawned by `OutlierDetector::spawn` owns the channel sender and
forwards decisions, so dropping the returned `AbortOnDrop` ends the
loop and closes the receiver. Tests drive `run_sweep` directly without
the channel or tokio time mechanics.

Algorithm coverage matches the gRFC:
  - Success-rate ejection with configurable `stdev_factor`,
    `enforcing_success_rate`, `minimum_hosts`, `request_volume`.
  - Failure-percentage ejection with `threshold`, `enforcing_failure_
    percentage`, `minimum_hosts`, `request_volume`.
  - Ejection multiplier increments on each ejection, decays on healthy
    intervals; ejection duration is `base * multiplier` capped at
    `max(base, max_ejection_time)`.
  - `max_ejection_percent` caps total concurrent ejections.

Probability rolls go through an injectable `Rng` trait (defaulting to
`fastrand`) so tests can pin enforcement decisions.

Standalone in this PR — no integration with the load balancer yet.
That lands in a follow-up alongside the per-endpoint outcome
interception layer.

Refs: https://github.com/grpc/proposal/blob/master/A50-xds-outlier-detection.md
Address two follow-up review comments from grpc#2604 (the merged config
PR) by folding the doc updates into this PR:

- Module docstring: describe the actual integration plan (an mpsc
  channel of EjectionDecisions polled by LoadBalancer, leveraging
  EjectedChannel) instead of the original "filter on the Discover
  stream" wording. Add intra-doc links to the relevant types.

- enforcing_success_rate / enforcing_failure_percentage: clarify
  that each is the *enforcement probability* — distinct from the
  per-algorithm threshold (stdev_factor for success-rate, threshold
  for failure-percentage). Note that 0 disables enforcement while
  still computing statistics.

Also fix an unresolved intra-doc link in the algorithm module.
Three spec-compliance fixes to `run_sweep` and the failure-percentage
algorithm:

1. Reorder the sweep to match A50 step order: snapshot counters → run
   success-rate algorithm → run failure-percentage algorithm → step-5
   housekeeping (decrement non-ejected multipliers, un-eject elapsed
   ejections). The previous order (un-eject before algorithms) caused
   spurious `Uneject` decisions whenever the same sweep also re-ejected
   the address. Per spec, re-ejection refreshes `ejected_at` to `now`
   before the un-eject check runs, so no transient un-eject is emitted.

2. Drop the `total > 0` traffic gate from the multiplier-decrement
   step. A50 says a non-ejected address with multiplier > 0 has its
   multiplier decremented every sweep, regardless of whether it
   received traffic that interval.

3. Failure-percentage now uses strict `>` against the threshold (was
   `>=`). Per A50: "If the address's failure percentage is greater
   than `failure_percentage_ejection.threshold`..." — an address
   sitting exactly at the threshold is not ejected.

Also: drop the explicit "skip ejected hosts from candidate list" pre-
filter. Per spec the algorithms iterate every address; ejected hosts
naturally fail the `request_volume` gate since they receive no traffic
in production. Behavior on real workloads is unchanged.

Test changes:
  - `re_ejection_doubles_duration` now asserts a single `Eject`
    decision (no transient `Uneject`) under the corrected sweep order.
  - New `failure_percentage_at_threshold_does_not_eject` covers the
    strict-`>` boundary.
  - New `multiplier_decrements_even_without_traffic` covers the
    no-traffic-gate fix.
Drop the success-rate algorithm and its tests from this PR so the
outlier-detection PR is minimal and stand-alone. The scaffolding
(sweep loop, multiplier state, counters, max-ejection-percent budget)
is unchanged and still exercised by the failure-percentage algorithm
plus the multiplier / un-eject / cap tests.

If `OutlierDetectionConfig.success_rate` is set on the cluster, it is
currently ignored. Documented in the module docstring with a pointer
to the follow-up PR.

Removes:
  - `OutlierDetector::run_success_rate` (mean / variance / sqrt math).
  - `success_rate` dispatch in `run_sweep`.
  - `run_failure_percentage`'s `!out.contains` filter — dead now that
    only one algorithm runs per sweep.
  - `success_rate_ejects_outlier_below_threshold` test.
  - `success_rate_no_ejection_when_all_uniform` test.
  - The `sr_config` test helper.
  - Unused `SuccessRateConfig` import.
@LYZJU2019 LYZJU2019 changed the title feat(tonic-xds): add OutlierDetector sweep engine (gRFC A50) feat(tonic-xds): add OutlierDetector sweep engine + failure-percentage algorithm (gRFC A50) Apr 30, 2026
Switch from `mpsc::unbounded_channel` to `mpsc::channel(256)` for the
ejection-decision stream that the sweep loop emits.

The decisions are edge-triggered (`Eject`/`Uneject` transitions, not
state snapshots), so the consumer must process every event in order;
we can't drop or coalesce. But we don't want unbounded memory growth
either if the consumer stalls. A bounded channel gives us:

  - Same correctness as unbounded — no events dropped, ordered delivery.
  - Bounded memory.
  - Natural backpressure: when the buffer fills, `tx.send().await`
    parks the sweep task, which (combined with `MissedTickBehavior::
    Skip`) throttles sweep cadence to whatever rate the consumer can
    drain. Computing more decisions than the consumer can apply just
    widens the desync.

Capacity is 256 — at most `2 * num_endpoints` decisions per sweep, so
this buffers several sweeps' worth of decisions for clusters of typical
size. A docstring on `DECISIONS_CHANNEL_CAPACITY` captures the
rationale for future readers.
Replace `spawn_with_rng` with `spawn_with`, taking an
`OutlierDetectorOptions` struct that bundles the RNG and the new
configurable `decisions_channel_capacity`. Defaults are unchanged
(`fastrand` RNG, capacity 256).

The hard-coded constant becomes `DEFAULT_DECISIONS_CHANNEL_CAPACITY`
and is no longer the only knob — production callers may want to bump
the bound for clusters with very large endpoint sets (worst case
`2 * num_endpoints` decisions per sweep) or unusually slow consumers.

Using a struct instead of a long argument list means future runtime
knobs (custom Tokio runtime, alternate backoff policies, observability
hooks, …) can be added without breaking call sites — callers typically
construct via `..Default::default()`.

The xDS-derived `OutlierDetectionConfig` stays separate from these
host-side runtime knobs, keeping a clean line between "what the xDS
proto specifies" and "how this binary chooses to host it."
…tests

Both `sweep_loop_emits_decisions_on_tick` and
`dropping_abort_stops_sweep_loop` previously used `tokio::time::sleep`
in `start_paused = true` mode. That works through the runtime's
auto-advance heuristic for parked tasks, but the heuristic is sensitive
to the order of pending wake-ups across multiple tasks and can be
flaky in practice.

  - `sweep_loop_emits_decisions_on_tick`: switch to
    `tokio::time::advance(150ms)` which explicitly moves the clock and
    yields until pending wake-ups have been polled — deterministic.
  - `dropping_abort_stops_sweep_loop`: drop the artificial sleep
    altogether. Aborting the JoinHandle wakes the spawned task
    synchronously; the runtime polls it, the harness observes the
    abort, and the task ends — dropping its sender. `rx.recv().await`
    parks briefly while that happens and then returns `None`. No time
    advancement needed.

Stress-tested both tests 50× back-to-back: all pass.
Comment thread tonic-xds/src/client/loadbalance/outlier_detection.rs Outdated
Comment thread tonic-xds/src/client/loadbalance/outlier_detection.rs Outdated
Comment thread tonic-xds/src/client/loadbalance/outlier_detection.rs Outdated
LYZJU2019 added 4 commits May 4, 2026 10:29
Rewrite the doc comment to be reference documentation rather than a
design narrative. Drops the editorializing ("the right behavior") and
the first-person reasoning, keeps the three things a developer needs:
what the constant controls, why this size, what happens at capacity
(and why decisions can't be dropped or coalesced), and how to override.
The previous design used two separate `AtomicU64`s and snapshotted via
two independent `swap` calls — the doc comment claimed this was atomic
across the pair, but it isn't: an RPC completing between the two swaps
inflates the next snapshot by one event, biasing the failure-percentage
computation slightly under contention.

Pack both counters into one `AtomicU64` (high 32 bits: successes, low
32 bits: failures). `record_*` becomes a single `fetch_add` (same hot-
path cost as before), `snapshot_and_reset` becomes a single `swap(0)`,
and the snapshot is now genuinely atomic across the pair — matching
the bucket-swap semantics the gRFC describes.

Each counter is capped at `u32::MAX` per sweep interval. Exceeding it
would carry into the other counter's bits, but the cap is unreachable
for realistic workloads (> 4 × 10⁹ RPCs to one endpoint within one
interval). Documented on the struct.
Comment thread tonic-xds/src/client/loadbalance/outlier_detection.rs Outdated
Comment thread tonic-xds/src/client/loadbalance/outlier_detection.rs Outdated
Comment thread tonic-xds/src/client/loadbalance/outlier_detection.rs Outdated
Comment thread tonic-xds/src/client/loadbalance/outlier_detection.rs Outdated
Guard the `100 * failure / total` division against `total == 0`.
gRFC A50 doesn't forbid `request_volume == 0`, in which case the
qualifying filter `c.total >= request_volume` admits candidates with
zero traffic; the spec is silent on `0/0`, so skip those endpoints
rather than panic.
}

/// Variant of [`Self::spawn`] that accepts custom runtime options.
pub(crate) fn spawn_with(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about we just don't use these spawned loops at all, and just using RPCs as ticks for the outlier detection window? You can still use wallclock time, just that instead of ticking every second, tick it with every RPC call.

This works because tonic-xds only need to deal with all ejection signals in loadbalancer.poll_ready(), which already take mut self of loadbalancer, the decision of outlier detection will be global and without race conditions.

LYZJU2019 and others added 6 commits May 4, 2026 15:27
…tests

Drop the test-only `sort` helper that compared `EjectionDecision`s by
their `Debug` string representation, which was fragile (any change to
the `Debug` impl would silently change ordering). Derive `PartialOrd`
and `Ord` on `EjectionDecision` (and on `EndpointAddress` /
`EndpointHost`, since the address is the inner field) and call
`Vec::sort` directly at the one test site.
When an already-ejected endpoint has in-flight RPCs that complete
during its ejection backoff, those completions accumulate on its
counter. At the next sweep the algorithm may "re-eject" the host
(refreshing its `ejected_at` timestamp and bumping the multiplier).
That action does not change the count of currently-ejected addresses,
so per A50's `max_ejection_percent` check it must not consume a slot
in the cap — but the previous code decremented the budget for it,
under-counting how many *new* ejections the cap allows.

Track the pre-sweep ejection state on each `Candidate` and only
decrement the budget for new ejections in the failure-percentage
algorithm. Add a regression test covering the specific scenario.
Replace the spawned sweep loop + mpsc channel with an on-demand model:
the detector exposes `maybe_run_sweep(&mut self, now: Instant) -> Vec
<EjectionDecision>` and the consumer (the load balancer in a follow-up
PR) calls it from its own event loop — typically `poll_ready` —
gated by wallclock time.

This eliminates a significant amount of machinery:
  - `tokio::spawn`, `sweep_loop`, `AbortOnDrop`, the mpsc channel.
  - The bounded-channel capacity option, its constant, and its docs
    (`OutlierDetectorOptions::decisions_channel_capacity`,
    `DEFAULT_DECISIONS_CHANNEL_CAPACITY`).
  - `OutlierDetectorOptions` itself — collapses to two constructors
    `new(config)` and `with_rng(config, rng)`.
  - The `Mutex` on `state` — the consumer's `&mut self` already
    serializes access.
  - Two `#[tokio::test(start_paused = true)]` tests that exercised the
    spawned task and its abort handle.

Sweep timing now depends on RPC traffic: when no RPCs flow, no sweeps
run. This matches A50's intent (sweeps happen approximately every
`interval` while traffic is flowing) and is observably equivalent
because ejection only matters during endpoint picking, which only
happens during RPCs. Suggested by the PR review.

Tests:
  - All algorithm-level tests rewritten to use owned `OutlierDetector`
    + `&mut self` calls, no `Mutex::lock()`, no Arc.
  - Three new `maybe_run_sweep_*` tests cover the interval gate:
    runs on first call, skips before interval elapsed, runs after.
  - Existing failure-percentage and multiplier/un-ejection tests
    unchanged in spirit; just adjusted to the new ownership model.
Pass through every doc comment and inline comment, removing rationale,
timeline language, and explanations that don't help a future reader.
Notable trims:

  - Module docstring drops "Knows nothing about the data path:" framing,
    the "lands in a follow-up PR" timeline (regression — flagged and
    removed earlier on a different doc), and the "(mean and standard
    deviation across the qualifying hosts)" parenthetical.
  - `Rng` trait drops the "Abstracted so tests can inject" rationale.
  - `OutlierDetector` struct drops "State is owned (no `Mutex`, no
    `Arc`):" framing.
  - `add_endpoint` / `remove_endpoint` / `with_rng` lose the trailing
    usage hints / explanatory parentheticals.
  - `maybe_run_sweep` / `run_sweep` tightened to facts-only.
  - Inline comments inside `run_sweep` drop "we model that" and
    "intentionally not yet dispatched in this PR" timeline.
  - Inline comment for the budget-decrement guard now points at
    `Candidate::already_ejected` instead of duplicating its doc.
  - Test `already_ejected_re_ejection_does_not_consume_budget` drops
    the "this would fail before the fix" git-history paragraph.
@LYZJU2019 LYZJU2019 marked this pull request as ready for review May 6, 2026 20:50
Comment on lines +29 to +39
/// Counts are packed into a single `AtomicU64` (high 32 bits:
/// successes, low 32 bits: failures), so each record is one `fetch_add`
/// and a snapshot is one `swap(0)`. Each counter is capped at
/// `u32::MAX` per sweep interval; exceeding that carries into the
/// other counter's bits but is unreachable for realistic workloads.
#[derive(Debug, Default)]
pub(crate) struct EndpointCounters {
/// High 32 bits: successes since last sweep.
/// Low 32 bits: failures since last sweep.
packed: AtomicU64,
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

\o/ Let's avoid such low-level dances and prioritize human readability. Packing them in the same atomic does not add much value because in reality, concurrent calls can succeed / fail before they bump up the packed counter. Forcing a snapshot view of the world by packing the counters together does not give us much.

Comment on lines +68 to +69
/// should keep its underlying connection alive (A50 requires
/// preserving connections across ejection).
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably should extend A50 and allow the transition from ejection to connecting. Yet we can discuss this more offline in the future. For now it's fine.

Comment thread tonic-xds/src/client/loadbalance/outlier_detection.rs Outdated
/// load balancer's `poll_ready`).
pub(crate) struct OutlierDetector {
config: OutlierDetectionConfig,
state: HashMap<EndpointAddress, EndpointState>,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you plan to integrate this with LoadBalancer?

After reading the Envoy doc https://www.envoyproxy.io/docs/envoy/latest/intro/arch_overview/upstream/outlier.html , I think there are 3 types of outlier detections:

  1. consecutive failures: these should be accumulated inline after ReadyChannels are called, and should be handled directly during poll_ready of LoadBalancer(), we can implement this in the future.

  2. Succsss / Failure threshold -- these need to be re-calculated as call()s are performed on the channels. O(n) scans on atomic counters are the most straightforward ways. But we should avoid doing O(n) scans in poll_ready(), because it's the performance-sensitive path.

///
/// Held by `&mut`; the consumer drives sweeps by calling
/// [`Self::maybe_run_sweep`] from its own event loop (typically the
/// load balancer's `poll_ready`).
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should avoid doing O(N) scans in poll_ready(). We can do this in a spawned actor, it can share a DashMap of ReadyLbChannels with the LoadBalancer.

The outlier detection actor can notify LoadBalancer about ejections via a ReadyLbChannel.ejected future. The LoadBalancer can listen to these futures via a UnorderedFutures, so everything is O(1) in critical path.

LYZJU2019 added 5 commits May 8, 2026 11:24
The packed-AtomicU64 design fixed a specific gap raised earlier in
review (the snapshot's two swaps weren't atomic against each other),
but the cost in readability and the marginal correctness benefit no
longer justify it: the snapshot boundary is approximate either way —
RPCs land continuously, so some always cross between "this interval"
and "next interval" regardless of how the swap is implemented. For a
statistical threshold at 85% over typically hundreds-to-thousands of
RPCs per interval, the bias is well below the precision of the check.

Replace the packing with two plain `AtomicU64` counters and document
the snapshot's non-atomicity honestly on `snapshot_and_reset`.
Move outlier-detection state onto the channels themselves and run the
sweep in a spawned actor task that mutates the shared state. The load
balancer's `poll_ready` will observe ejection events O(1) per change
via per-channel `watch::Receiver::changed()` futures (wired in a
follow-up integration PR), so the O(n) scan stays off the LB's
critical path.

In `channel_state.rs`:
- Add `EndpointCounters` (lock-free success/failure atomics) and
  `OutlierChannelState` (counters + edge-triggered `watch::Sender<bool>`
  ejection signal). Both `pub(crate)`.
- `ReadyChannel` gains `outlier: Arc<OutlierChannelState>`.
  `ConnectingChannel::new` generates a fresh state; `with_outlier`
  preserves an existing one (for reconnect paths).
- `EjectedChannel` carries the outlier state through the cooldown so
  it survives the eject → un-eject cycle.

In `outlier_detection.rs`:
- `OutlierDetector` no longer owns counters; it owns only algorithm-
  private state (per-endpoint multiplier and last-ejection timestamp)
  and config + RNG.
- `OutlierStatsRegistry = Arc<DashMap<EndpointAddress,
  Arc<OutlierChannelState>>>` is the shared structure between the
  detector and the LB.
- `run_sweep(&mut self, now, &OutlierStatsRegistry)` scans the
  DashMap, snapshots counters via the channel state, decides
  ejections, and applies them inline by calling
  `OutlierChannelState::eject()` / `uneject()`. Algorithm state for
  removed channels is GC'd per-sweep.
- `OutlierDetector::spawn(config, channels)` spawns the actor task on
  a `tokio::time::interval` ticker; returns `AbortOnDrop` for
  lifecycle control. `EjectionDecision` enum, `maybe_run_sweep`,
  `last_sweep_at`, `add_endpoint`, and `remove_endpoint` are all
  removed — the actor + shared state replaces them.
- Tests rewritten to drive the new shape: construct a
  `DashMap<EndpointAddress, Arc<OutlierChannelState>>`, populate
  counters, call `run_sweep` and observe `is_ejected()` directly.
  Adds two actor-level tests covering `spawn` + `AbortOnDrop`.
Pivot the algorithm split per design feedback:

- Per-RPC detection runs inline on each call completion via
  `OutlierStatsRegistry::record_outcome`. The wrapper records the
  outcome on the channel's `OutlierChannelState`, evaluates the
  failure-percentage threshold against the channel's local counters,
  and ejects the channel directly by flipping its
  `watch::Sender<bool>`. Cluster-wide gates (`minimum_hosts`,
  `max_ejection_percent`) are enforced via two atomic counters on the
  registry, kept in sync as channels cross thresholds.
- The spawned actor runs only interval-boundary housekeeping: counter
  reset, un-eject if backoff has elapsed, decrement multipliers for
  non-ejected channels. The actor never makes ejection decisions.

Reaction latency drops from up to one `interval` (default 10s) to the
first failed RPC after `request_volume` is reached, while
`LoadBalancer::poll_ready` stays O(1) — ejections are observed via
per-channel `watch::Receiver::changed()` futures in a
`FuturesUnordered`, which the integration PR will wire.

Implementation:

- `OutlierChannelState` (channel_state.rs) gains atomic ejection-time
  state: `is_qualifying: AtomicBool`, `ejection_multiplier: AtomicU32`,
  `ejected_at_nanos: AtomicU64` with a constant `epoch: Instant`.
  `try_eject` / `try_uneject` are CAS-style and return whether the
  call performed the transition, so callers can update registry
  counters exactly once.
- `OutlierStatsRegistry` (outlier_detection.rs) is the new central
  type. Holds the `DashMap<EndpointAddress, Arc<OutlierChannelState>>`,
  cluster-wide atomic counters, config, and RNG. All methods take
  `&self` (concurrent access from data path and actor).
- `OutlierDetector` struct removed; everything lives on the registry.
  The actor is spawned via the free `spawn_actor(registry)` function.
- Tests rewritten: drive `record_outcome` and observe `is_ejected()`;
  drive `run_housekeeping` for interval-boundary scenarios.
Outlier-detection state belongs to `ReadyChannel` — the only state
machine variant that serves traffic. `ConnectingChannel` is just a
connect future and `EjectedChannel` is just a cooldown timer; neither
reads or writes counters or the ejection signal, so neither should
carry the `Arc<OutlierChannelState>`.

Changes:

- `ConnectingChannel<S>::Output` is now bare `S` (was
  `ReadyChannel<S>`). The captured async block no longer holds an
  outlier state; the address is kept by the caller (typically as the
  key in `KeyedFutures`).
- `EjectedChannel<S>` drops its `outlier` field. `UnejectedChannel::
  Ready(S)` now carries a bare service; the consumer re-attaches the
  registry-supplied outlier state when wrapping it back into a
  `ReadyChannel`.
- `ReadyChannel` gains an explicit `new(addr, inner, outlier)`
  constructor so the outlier state is required at construction time.
- `ReadyChannel::eject` and `ReadyChannel::reconnect` drop the
  outlier reference — it lives in the registry, keyed by address, and
  survives the cycle.
- `LoadBalancer::connecting` is now
  `KeyedFutures<EndpointAddress, C::Service>` (was over
  `ReadyChannel<C::Service>`). `poll_connecting` wraps the resolved
  service into a `ReadyChannel` with a fresh `OutlierChannelState`;
  the integration PR replaces the fresh state with one supplied by
  the `OutlierStatsRegistry`.

Tests in `channel_state.rs` use a small `wrap_ready` helper to build
`ReadyChannel` instances from the bare services returned by
`IdleChannel::connect()`.
Wire the outlier-detection registry into `LoadBalancer` end-to-end:

- `LoadBalancer::with_outlier(discovery, connector, picker,
  Some(registry))` constructs an LB that participates in outlier
  detection. The plain `new(...)` constructor is a thin alias that
  passes `None` (no outlier detection); existing tests are
  unchanged.
- At construction, the housekeeping actor is spawned via
  `spawn_actor(registry)`; the returned `AbortOnDrop` is stored on
  the LB so the actor stops when the LB is dropped.
- `poll_discover` now also unhooks the registry entry, the ejection
  signal stream, and any ejected slot when an address is removed or
  re-inserted.
- `poll_connecting` registers the new channel with the registry
  (`registry.add_channel(addr)`), subscribes to its ejection signal
  via `WatchStream::from_changes`, and inserts the stream into a
  `StreamMap<EndpointAddress, WatchStream<bool>>`.
- A new `poll_ejection_signals` step in `poll_ready` drains the
  `StreamMap` in amortized O(1) per transition, moving channels
  between `ready: IndexMap` and a new `ejected: HashMap`. The picker
  continues to see only `ready`, so ejected endpoints are
  automatically excluded from selection.
- `call` clones the picked channel's `OutlierChannelState` and, after
  the inner call completes, invokes `registry.record_outcome(state,
  result.is_ok())`. Per-RPC detection runs inline; the LB's critical
  path stays O(1) in the number of endpoints.

Other changes:
- `OutlierStatsRegistry::add_channel` is idempotent: re-inserting an
  existing address returns the existing state so reconnect cycles
  preserve counters and ejection bookkeeping.
- Cargo: `tokio-stream` gains the `sync` feature to expose
  `WatchStream`.

Three integration tests cover: a failing endpoint gets ejected and
removed from `ready`; a healthy cluster sees no ejections; endpoint
removal cleans up the registry.
@LYZJU2019 LYZJU2019 changed the title feat(tonic-xds): add OutlierDetector sweep engine + failure-percentage algorithm (gRFC A50) feat(tonic-xds): implement gRFC A50 outlier detection (failure-percentage) + LoadBalancer integration May 11, 2026
LYZJU2019 added 2 commits May 11, 2026 15:05
The four outlier-related fields on `LoadBalancer` (registry, ejected
pool, ejection-signal streams, actor handle) always lived in lockstep
— either all four were present (outlier detection enabled) or all
four were absent. Bundle them into a single `OutlierDetector<S>`
struct stored as `Option<OutlierDetector<C::Service>>` so the type
system enforces the invariant and the LB methods that touch outlier
state become one-line delegations.
Match grpc-go (`internal/xds/balancer/outlierdetection`) and Envoy
(`BaseDynamicClusterImpl::updateDynamicHostList` reusing existing
`HostSharedPtr`s): outlier-detection state is keyed by stable
endpoint identity and survives a transient discovery flap.
Previously, every `Change::Insert` ran the same purge path as
`Change::Remove`, wiping the registry entry along with the connecting
/ ready / ejected slots — a brief disappearance lost the channel's
counters and ejection multiplier.

Split the path:
  - `purge_endpoint` (Remove) — cancels connecting, clears ready, and
    drops all outlier bookkeeping including the registry entry.
  - `reset_active_slots` (Insert) — cancels connecting, clears ready,
    and drops the obsolete `ReadyChannel` from the detector's ejected
    pool, but leaves the registry entry and ejection-signal
    subscription intact.

`OutlierDetector::register` now only inserts a new signal subscription
when one is not already present, so a pending ejection transition is
not dropped by a redundant resubscribe. `poll_connecting` checks the
preserved `state.is_ejected()` and routes a re-discovered ejected
channel directly into the ejected pool via the new `place_ejected`,
avoiding any window where traffic could be routed to a logically
ejected channel.

Adds two regression tests:
  - `test_outlier_detection_reinsert_preserves_state` — counters
    survive Insert for an existing address; same `Arc` is returned.
  - `test_outlier_detection_reinsert_while_ejected_stays_ejected` —
    re-discovered ejected channel lands in the ejected pool, not
    `ready`.
@LYZJU2019 LYZJU2019 force-pushed the lyzju2019/a50-outlier-detector branch from 0ea0e7e to be41f3f Compare May 11, 2026 22:05
LYZJU2019 added 10 commits May 12, 2026 10:54
… machine

Use the type-state machinery from `channel_state.rs`
(`ReadyChannel::eject` → `EjectedChannel` → `UnejectedChannel`) as
the primary mechanism for outlier-detection ejection, retiring the
parallel `watch::Sender<bool>` + dual-map design.

The compile-time invariant that ejected channels cannot be picked is
now enforced by the type system: the picker takes `ReadyChannel<S>`,
ejected channels live in a `KeyedFutures<_, UnejectedChannel<_>>`
mirroring the existing pattern for `ConnectingChannel`. This brings
the outlier-detection LB integration in line with the project's
existing idioms and gives the previously-unused channel state machine
its first production caller.

Architecture:

  - **Data path** still uses `OutlierStatsRegistry::record_outcome` to
    apply the failure-percentage algorithm per-RPC. On transition to
    ejected the registry sends the address through an mpsc
    `UnboundedSender` rather than flipping a watch flag.
  - **LoadBalancer** drains the mpsc in `poll_ready`, consumes the
    matching `ReadyChannel` via `.eject(EjectionConfig { timeout, .. })`,
    and tracks the resulting `EjectedChannel` in a second
    `KeyedFutures`. Each ejected channel's internal `Sleep` fires
    exactly at `base × multiplier` (capped at `max_ejection_time`),
    yielding `UnejectedChannel::Ready(svc)`; `poll_unejection` drains
    it on the next `poll_ready` and routes the channel back into
    `ready`.
  - **Housekeeping actor** simplifies: it resets counters and
    decrements multipliers on the `config.interval` boundary, but no
    longer un-ejects — un-ejection is timer-driven by `EjectedChannel`.

`OutlierStatsRegistry` gains two methods:

  - `note_uneject(state)` — clears the `ejected_at_nanos` atomic on
    the channel state and decrements `ejected_count`. Called by the
    LB when an `EjectedChannel`'s timer fires.
  - `remaining_ejection(state, now)` — computes how much of the
    ejection window is left, capped by `max_ejection_time`. Used by
    the LB on initial ejection (full duration) and on re-discovery
    (remaining duration) to size the `EjectionConfig::timeout`.

`OutlierChannelState` drops the `watch::Sender<bool>` field entirely;
`is_ejected` / `try_eject` / `try_uneject` now use atomic CAS on
`ejected_at_nanos` as the single source of truth. The `OutlierDetector`
struct simplifies to `{ registry, eject_rx, _actor }` — no generic
parameter, no internal `ejected` map, no signal-stream aggregator.

Re-discovery while ejected (Insert for an address whose preserved
state says `is_ejected`) re-ejects the new channel with the
`remaining_ejection` duration so the original backoff is honored
rather than restarted; if the deadline has already passed, the
channel is un-ejected immediately. Behavior matches grpc-go and
Envoy.

Adds tests:
  - `OutlierStatsRegistry::{remaining_ejection,note_uneject}` — five
    new unit tests covering full duration, cap, mid-eject subtraction,
    past-deadline, and not-ejected cases.
  - `ejection_dispatches_address_through_mpsc` — verifies the data
    path sends through the mpsc on transition.
  - `housekeeping_leaves_ejected_multipliers_alone` — guards the new
    invariant that the actor no longer touches ejected channels.
  - `test_outlier_detection_timer_driven_unejection` — end-to-end LB
    test that an ejected channel returns to `ready` after
    `base × multiplier` elapses (with `tokio::time::advance`).

Adds `KeyedFutures::contains_key` for test access; no production
caller depends on it.
A50 step 6 runs once per interval and (a) un-ejects hosts whose
backoff has elapsed, then (b) decrements the multiplier for every
non-ejected host — in the same sweep. Envoy implements this exactly,
so a host un-ejected at sweep N has its multiplier decremented at
sweep N.

In this PR's design un-ejection is timer-driven (each EjectedChannel
holds its own Sleep), decoupled from the housekeeping sweep. With
the previous note_uneject, the multiplier was only decremented at
the next housekeeping interval — leaving a window where a re-eject
during that window would see a stale (one-higher) multiplier and
back off too aggressively relative to the spec.

Apply the decrement inside note_uneject so it happens atomically
with the transition. The actor's housekeeping decrement still runs
at each interval; saturating arithmetic keeps the eventual
decrement-to-zero correct.

Adds a focused test (`re_eject_after_uneject_uses_fresh_multiplier`)
verifying that a re-ejection immediately after un-ejection sizes the
remaining-ejection duration with the fresh multiplier (base × 1),
not the stale one (base × 2).
The load-then-store implementation could lose decrements when:
  - actor housekeeping decrements concurrently with note_uneject
    (both call decrement_multiplier);
  - or either of those races a data-path try_eject (which does
    fetch_add on the same atomic).

Swap to fetch_update with a saturating closure so the read-modify-
write is atomic. Bias was bounded at ±1 before, so this is not a
correctness fix per se — just closes a small race window cleanly.
…s wired twice

The registry's eject-signal mpsc receiver is one-shot — a registry
can drive at most one LoadBalancer. The previous implementation
panicked at runtime if a misuse handed the same registry to two
`with_outlier` calls.

Return a typed error (`RegistryAlreadyWired`) from
`OutlierStatsRegistry::take_eject_rx`, propagated through
`OutlierDetector::new` and `LoadBalancer::with_outlier`.
`LoadBalancer::new` stays infallible because the `outlier=None` path
does not invoke the registry hand-off.

Adds `test_outlier_registry_cannot_be_wired_twice` to lock the
contract.
Store the endpoint address directly on `OutlierChannelState` at
construction time and expose `state.addr()` so downstream callers
don't need to thread `(addr, state)` pairs alongside the state.

API impact:
  - `OutlierChannelState::new(addr)` now takes the address explicitly.
  - `OutlierStatsRegistry::record_outcome(state, success)` drops its
    `addr` parameter; the mpsc dispatch reads `state.addr()`.
  - `ReadyChannel::addr()` (added earlier this PR only for the
    `record_outcome` thread-through) is removed — no remaining caller.

The data path now passes just the `Arc<OutlierChannelState>` to
`record_outcome`, which is cleaner and removes the awkwardness of
two parameters that always travel together.
Tighten doc comments across the outlier-detection module, the
channel state machine, and the LB. Remove rationale-style narrative
and references to past designs; keep API contracts, gRFC references,
and non-obvious invariants. No code changes.
Pass the outlier state through `ReadyChannel::eject` →
`EjectedChannel` so its `Future::poll` can yield
`UnejectedChannel::Ready(ReadyChannel<S>)` with the state already
reattached, instead of `Ready(S)` and asking the LB to rebuild.

Symmetric ends for the `Ready ↔ Ejected` transition (both speak
`ReadyChannel`), and `poll_unejection`'s ready arm drops its
state-lookup + `ReadyChannel::new` rebuild — it just calls
`note_uneject(ready.outlier())` and inserts. The `Connecting` arm
stays asymmetric since the fresh connect produces a bare service.
…l::new

`ConnectingChannel` stopped constructing `ReadyChannel` internally
when outlier state was added (the registry-supplied
`Arc<OutlierChannelState>` has to come from the LB, not the
channel-state type), and the second parameter `_addr` has been
ignored ever since. Drop it; callers already hand `KeyedFutures` the
canonical address as the key.
The `Rng` trait existed only as a test seam for the
`enforcing_failure_percentage` probability roll, but every caller in
both the algorithm tests and the LB integration tests uses
`enforcing = 100` or `enforcing = 0` — values for which `roll` short-
circuits without consulting the RNG. The trait, the `FastRandRng`
default, and the test-side `FixedRng` / `AlwaysFireRng` impls were
all bookkeeping for a code path none of them exercised.

Inline the `fastrand::u32(0..100)` call into `roll` and remove the
trait. `OutlierStatsRegistry::with_rng` collapses into `new`.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants