Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cmd/sluice/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,8 +513,15 @@ func main() {
if failoverBroker != nil {
// Plain text: TelegramChannel.Notify sends with no parse
// mode, so markdown backticks would render literally.
// Exhausted: no distinct member to fail over to (every
// member cooling) — report it as pool exhaustion, NOT a
// self-referential "X -> X" transition.
msg := fmt.Sprintf("pool %s failed over %s -> %s (%s)",
ev.Pool, ev.From, ev.To, ev.Reason)
if ev.Exhausted {
msg = fmt.Sprintf("pool %s exhausted: all members cooling down (%s); no healthy account to fail over to",
ev.Pool, ev.Reason)
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
for _, ch := range failoverBroker.Channels() {
Expand Down
2 changes: 1 addition & 1 deletion cmd/sluice/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func handlePoolRotate(args []string) error {
return fmt.Errorf("pool %q rotate: resolved active member %q is not in the pool snapshot (membership changed under the rotate); re-check with \"sluice pool list %s\"", name, active, name)
}
until := time.Now().Add(vault.AuthFailCooldown)
wrote, err := db.SetCredentialHealthIfPoolMemberEpoch(active, name, rotateEpoch, "cooldown", until, "manual rotate")
wrote, err := db.SetCredentialHealthIfPoolMemberEpoch(active, name, rotateEpoch, "cooldown", until, vault.ManualRotateReason)
if err != nil {
return err
}
Expand Down
12 changes: 12 additions & 0 deletions internal/proxy/addon.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"strings"
"sync"
"sync/atomic"
"time"

mitmproxy "github.com/lqqyt2423/go-mitmproxy/proxy"
"github.com/nemirovsky/sluice/internal/audit"
Expand Down Expand Up @@ -120,6 +121,17 @@ type SluiceAddon struct {
// on the hot StreamResponseModifier path.
dlpStreamWarned sync.Map

// poolNoticeMu/poolNoticeAt deduplicate pool failover / exhaustion
// operator notices (audit row + Telegram) within poolNoticeDedupWindow.
// A failing pooled destination produces a burst of identical signals
// (agent retries + pipelined requests racing the synchronous
// MarkCooldown); without this an exhausted pool spammed one notice per
// retry. Mutex-guarded (not sync.Map) so a concurrent burst cannot have
// two goroutines both miss the dedup and both emit. See
// shouldEmitPoolNotice in pool_failover.go.
poolNoticeMu sync.Mutex
poolNoticeAt map[string]time.Time

// refreshGroup deduplicates concurrent OAuth token refresh responses
// for the same credential. Keyed by credential name so only one
// vault update occurs when multiple requests trigger simultaneous
Expand Down
99 changes: 85 additions & 14 deletions internal/proxy/pool_failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,13 @@ type FailoverEvent struct {
Reason string // short tag: 429 | 403 | 401 | invalid_grant | invalid_token
Class failoverClass
Until time.Time // member cooldown expiry just applied
// Exhausted is true when there was NO distinct member to fail over to
// (every member is cooling and the soonest-recovering one is the member
// that just failed). The cooldown is still applied to From for
// durability, but this is a pool-exhaustion signal, not a real
// transition: the operator notice and audit action say so, and it is
// deduplicated so an agent's retry storm produces one line, not N.
Exhausted bool
// Epoch is the From member's membership epoch in the resolver
// generation that produced this failover. The durable guarded write
// commits only if (From, Pool, Epoch) is still a live membership row,
Expand Down Expand Up @@ -447,16 +454,49 @@ func (a *SluiceAddon) handlePoolFailover(f *mitmproxy.Flow) {
to = next
}

log.Printf("[POOL-FAILOVER] pool %q: %s -> %s (%s); member %q cooling down until %s",
pool, from, to, tag, from, until.Format(time.RFC3339))
// to == from means ResolveActive degraded back to the member that just
// failed: every member is cooling and the soonest-recovering one IS
// `from`. There is NO distinct member to fail over to. Emitting a
// "<from> -> <from>" cred_failover here (and one Telegram notice per
// request) was both meaningless and a notification storm — the agent
// retries N times, each retry re-fails on the still-exhausted member
// and re-entered this path, producing N identical "failed over A -> A"
// notices. Classify it honestly as pool exhaustion instead.
exhausted := to == from

// Audit: emit a cred_failover action with the documented Reason shape
// "<pool>:<from>-><to>:<tag>". Safe to call with a nil auditLog. The
// blake3 hash chain is appended synchronously by FileLogger.Log; the
// Deduplicate identical signals within a short window. Concurrent
// in-flight requests (pipelined agents) and retries that race the
// synchronous MarkCooldown above would otherwise each emit one audit
// row + one operator notice. One per (pool,from,to,tag) per window is
// all the operator needs; the cooldown itself was already applied
// unconditionally above, so suppressing the notice loses nothing.
if !a.shouldEmitPoolNotice(pool, from, to, tag) {
return
}

if exhausted {
log.Printf("[POOL-FAILOVER] pool %q exhausted: all members cooling (%s); no failover target, serving least-bad %q",
pool, tag, from)
} else {
log.Printf("[POOL-FAILOVER] pool %q: %s -> %s (%s); member %q cooling down until %s",
pool, from, to, tag, from, until.Format(time.RFC3339))
}

// Audit: a real failover emits cred_failover with the documented Reason
// shape "<pool>:<from>-><to>:<tag>"; pool exhaustion emits the distinct
// pool_exhausted action so operators can alert on it separately and are
// not misled by a self-referential transition. Safe with a nil auditLog.
// The blake3 hash chain is appended synchronously by FileLogger.Log; the
// write is local and fast (mirrors logDLPAudit on the same path), so it
// does not warrant detaching like the store/Telegram side effects.
if a.auditLog != nil {
host, port := connectTargetForFlow(a, f)
action := "cred_failover"
reason := fmt.Sprintf("%s:%s->%s:%s", pool, from, to, tag)
if exhausted {
action = "pool_exhausted"
reason = fmt.Sprintf("%s:exhausted:%s", pool, tag)
}
evt := audit.Event{
Destination: host,
Port: port,
Expand All @@ -465,8 +505,8 @@ func (a *SluiceAddon) handlePoolFailover(f *mitmproxy.Flow) {
// scoped pooled binding the audit must record the real protocol.
Protocol: proto,
Verdict: "failover",
Action: "cred_failover",
Reason: fmt.Sprintf("%s:%s->%s:%s", pool, from, to, tag),
Action: action,
Reason: reason,
Credential: from,
}
if err := a.auditLog.Log(evt); err != nil {
Expand All @@ -477,15 +517,46 @@ func (a *SluiceAddon) handlePoolFailover(f *mitmproxy.Flow) {
// (3) Durability + Telegram via the callback. The callback is
// responsible for being non-blocking (it runs the store write and the
// Telegram send in its own goroutine); we still guard with a nil check.
// The durable cooldown is persisted even when exhausted (the member did
// fail); only the operator-facing wording differs.
if a.onFailover != nil {
a.onFailover(FailoverEvent{
Pool: pool,
From: from,
To: to,
Reason: tag,
Class: class,
Until: until,
Epoch: idEpoch,
Pool: pool,
From: from,
To: to,
Reason: tag,
Class: class,
Until: until,
Exhausted: exhausted,
Epoch: idEpoch,
})
}
}

// poolNoticeDedupWindow bounds how often an identical pool failover /
// exhaustion signal (same pool, from, to, tag) produces an audit row +
// operator notice. The synchronous in-memory MarkCooldown already switched
// the active member before this fires, so a burst of agent retries within
// the window is genuinely the same event, not new information.
const poolNoticeDedupWindow = 30 * time.Second

// shouldEmitPoolNotice returns true at most once per poolNoticeDedupWindow
// for a given (pool,from,to,tag). It is mutex-guarded (not a sync.Map
// LoadOrStore) so a concurrent burst cannot have two goroutines both miss
// and both emit. The map is keyed by a NUL-joined tuple; key cardinality is
// bounded by pool x member x member x tag, so it does not grow unbounded in
// practice.
func (a *SluiceAddon) shouldEmitPoolNotice(pool, from, to, tag string) bool {
key := pool + "\x00" + from + "\x00" + to + "\x00" + tag
now := time.Now()
a.poolNoticeMu.Lock()
defer a.poolNoticeMu.Unlock()
if a.poolNoticeAt == nil {
a.poolNoticeAt = make(map[string]time.Time)
}
if last, ok := a.poolNoticeAt[key]; ok && now.Sub(last) < poolNoticeDedupWindow {
return false
}
a.poolNoticeAt[key] = now
return true
}
160 changes: 160 additions & 0 deletions internal/proxy/pool_failover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -919,3 +919,163 @@ func TestServerStorePoolStaleGenerationCooldownNotLost(t *testing.T) {
t.Fatalf("ResolveActive = %q, want y (z cooled via stale-gen mark)", got)
}
}

// auditActionCount reads a closed FileLogger's file and counts lines whose
// audit Action equals want.
func auditActionCount(t *testing.T, logPath, want string) int {
t.Helper()
data, err := os.ReadFile(logPath)
if err != nil {
t.Fatalf("read audit log: %v", err)
}
n := 0
for _, line := range strings.Split(strings.TrimSpace(string(data)), "\n") {
if line == "" {
continue
}
var evt audit.Event
if uerr := json.Unmarshal([]byte(line), &evt); uerr != nil {
t.Fatalf("unmarshal audit line %q: %v", line, uerr)
}
if evt.Action == want {
n++
}
}
return n
}

// TestFailoverPoolExhaustedNoSelfFailoverSpam is the self-failover regression.
// When every member is cooling and the soonest-recovering one IS the member
// that just failed, ResolveActive degrades back to it: there is NO distinct
// failover target. The old code emitted a meaningless "memA -> memA"
// cred_failover AND one Telegram notice + audit row per agent retry (the
// production symptom was six identical "failed over X -> X (429)" notices).
//
// Fail-before: action == cred_failover, From==To, and a second Response emits
// a second row (no dedup). Pass-after: action == pool_exhausted, Exhausted
// is true, From==To==memA, and the dedup window collapses the retry storm to
// exactly one audit row + one onFailover.
func TestFailoverPoolExhaustedNoSelfFailoverSpam(t *testing.T) {
dir := t.TempDir()
logPath := filepath.Join(dir, "audit.log")
logger, err := audit.NewFileLogger(logPath)
if err != nil {
t.Fatalf("NewFileLogger: %v", err)
}
t.Cleanup(func() { _ = logger.Close() })

addon, _, prPtr := setupPoolAddon(t, "memA", "memB")
addon.auditLog = logger
client := setupAddonConn(addon, "auth.example.com:443")

// memB is already genuinely failure-cooled for LONGER than a 429 TTL, so
// after memA's own 429 cooldown the soonest-recovering member is memA
// itself -> ResolveActive degrades to memA -> no distinct target.
prPtr.Load().MarkCooldown("memB", time.Now().Add(10*time.Minute), "401")

var calls int32
var last FailoverEvent
done := make(chan struct{}, 4)
addon.SetOnFailover(func(ev FailoverEvent) {
atomic.AddInt32(&calls, 1)
last = ev
done <- struct{}{}
})

// Two back-to-back identical 429s (the agent's retry storm).
for i := 0; i < 2; i++ {
f := newPoolRespFlow(client, 429, []byte(`{"error":"rate_limited"}`))
addon.flowInjected.Tag(f.Id, "memA")
addon.Response(f)
}

select {
case <-done:
case <-time.After(2 * time.Second):
t.Fatal("onFailover callback not invoked")
}
// Dedup: the second identical signal within the window is suppressed.
if got := atomic.LoadInt32(&calls); got != 1 {
t.Fatalf("onFailover invoked %d times, want exactly 1 (dedup window must collapse the retry storm)", got)
}
if !last.Exhausted {
t.Fatalf("FailoverEvent.Exhausted = false, want true (no distinct failover target)")
}
if last.From != "memA" || last.To != "memA" {
t.Fatalf("FailoverEvent from=%q to=%q, want memA/memA (degraded to self)", last.From, last.To)
}

if err := logger.Close(); err != nil {
t.Fatalf("logger close: %v", err)
}
if n := auditActionCount(t, logPath, "pool_exhausted"); n != 1 {
t.Fatalf("pool_exhausted audit rows = %d, want exactly 1 (no per-retry spam)", n)
}
if n := auditActionCount(t, logPath, "cred_failover"); n != 0 {
t.Fatalf("cred_failover audit rows = %d, want 0 (a self-failover is NOT a real failover)", n)
}
}

// TestFailoverToManualRotateParkedPeer is the pool-stranding regression that
// broke the live agent: `sluice pool rotate` parks the previously-active
// member (reason ManualRotateReason). That member is healthy, just operator
// deprioritized. When the rotated-to member then 429s, EVERY member is
// cooling — the old soonest-by-time degrade picked the just-failed member
// (60s 429 TTL < 300s rotate park) and self-looped, hard-failing the agent.
//
// Fail-before: To == memA (self-loop), Exhausted true. Pass-after: the
// operator-parked-but-healthy memB is preferred -> a REAL failover memA ->
// memB, Exhausted false, cred_failover audit.
func TestFailoverToManualRotateParkedPeer(t *testing.T) {
dir := t.TempDir()
logPath := filepath.Join(dir, "audit.log")
logger, err := audit.NewFileLogger(logPath)
if err != nil {
t.Fatalf("NewFileLogger: %v", err)
}
t.Cleanup(func() { _ = logger.Close() })

addon, _, prPtr := setupPoolAddon(t, "memA", "memB")
addon.auditLog = logger
client := setupAddonConn(addon, "auth.example.com:443")

// Operator rotated onto memA: memB is parked (healthy, just
// deprioritized) for the long manual-rotate TTL. memA is active.
prPtr.Load().MarkCooldown("memB", time.Now().Add(vault.AuthFailCooldown), vault.ManualRotateReason)
if got, _ := prPtr.Load().ResolveActive("codex_pool"); got != "memA" {
t.Fatalf("pre-failover active = %q, want memA (memB operator-parked)", got)
}

var got FailoverEvent
done := make(chan struct{}, 1)
addon.SetOnFailover(func(ev FailoverEvent) {
got = ev
done <- struct{}{}
})

f := newPoolRespFlow(client, 429, []byte(`{"error":"rate_limited"}`))
addon.flowInjected.Tag(f.Id, "memA")
addon.Response(f)

select {
case <-done:
case <-time.After(2 * time.Second):
t.Fatal("onFailover callback not invoked")
}
if got.Exhausted {
t.Fatalf("FailoverEvent.Exhausted = true, want false (memB is a valid parked-but-healthy target)")
}
if got.From != "memA" || got.To != "memB" {
t.Fatalf("FailoverEvent from=%q to=%q, want memA -> memB (failover to operator-parked-but-healthy peer)", got.From, got.To)
}

if err := logger.Close(); err != nil {
t.Fatalf("logger close: %v", err)
}
if n := auditActionCount(t, logPath, "cred_failover"); n != 1 {
t.Fatalf("cred_failover audit rows = %d, want exactly 1 (real failover to memB)", n)
}
if n := auditActionCount(t, logPath, "pool_exhausted"); n != 0 {
t.Fatalf("pool_exhausted audit rows = %d, want 0 (a healthy parked peer exists)", n)
}
}
Loading
Loading