pkg/election: use native lease keepalive stream#10618
pkg/election: use native lease keepalive stream#10618JmPotato wants to merge 8 commits intotikv:masterfrom
Conversation
Signed-off-by: JmPotato <github@ipotato.me>
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughLease keep-alive was rewritten from a ticker-based KeepAliveOnce to a streaming keepalive worker driving etcd keepalives, added a persistent Changes
Sequence DiagramsequenceDiagram
participant Caller as KeepAlive Caller
participant Worker as Keep-Alive Worker
participant Etcd as etcd Lease
participant RespCh as KeepAlive Resp Channel
Caller->>Worker: Start KeepAlive(ctx)
Worker->>Etcd: KeepAlive(ctx, leaseID) -> stream
Etcd->>RespCh: send KeepAliveResponse
RespCh->>Worker: deliver response
Worker->>Worker: validate TTL (>0)
Worker->>Worker: compute expireTime (arrival time)
Worker->>Caller: publish/coalesce expireTime (size-1 buffer)
alt stream closed or ctx canceled
RespCh->>Worker: channel closed
Worker->>Worker: stop and close update channel
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Signed-off-by: JmPotato <github@ipotato.me>
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
pkg/election/lease.go (1)
64-70:⚠️ Potential issue | 🟠 MajorGuard
l.leasewith synchronization to prevent concurrent writes during Close/Grant-after-Close.The
leasefield is accessed without synchronization:Grant()writes it (line 71), whilekeepAliveWorkerreads it in a spawned goroutine (line 195), andClose()reads it from the main thread (lines 104, 107). The atomicclosedflag only protects the boolean, not the pointer itself. This creates a data race when Grant-after-Close reuses the sameLeaseinstance while akeepAliveWorkergoroutine or concurrentClose()is still accessing the oldl.lease.Add a mutex (or RWMutex) to guard reads and writes to
l.lease, or ensureClose()waits for any runningkeepAliveWorkergoroutine to exit before allowing reuse (e.g., via async.WaitGrouptracked by theKeepAlivecaller).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/election/lease.go` around lines 64 - 70, The Lease.Grant routine writes l.lease without synchronization while keepAliveWorker (spawned goroutine) and Close read it, causing a race; protect accesses by adding a lock or waiter: introduce a sync.RWMutex (e.g., leaseMu) on the Lease struct and wrap all reads/writes of l.lease in leaseMu.RLock()/RUnlock() and leaseMu.Lock()/Unlock() in Grant, Close, and keepAliveWorker, or alternatively add a sync.WaitGroup tracked by KeepAlive and have Close wait for the worker to finish before Grant can replace l.lease; update Grant, Close, and keepAliveWorker to use the chosen synchronization primitive consistently.
🧹 Nitpick comments (4)
pkg/election/lease_test.go (2)
216-247: fakeLease implementation looks good.Clean, minimal
clientv3.Leasefake scoped to this test file;keepAliveErrfield is already plumbed for future error-path tests (e.g., whenl.lease.KeepAliveitself returns an error atlease.go:187-191). If you want to lock that branch in now, a quick test that setskeepAliveErrand asserts the worker's channel closes immediately would cover the remaining branch inkeepAliveWorker.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/election/lease_test.go` around lines 216 - 247, Add a test that exercises the error branch where l.lease.KeepAlive returns an error by creating a fakeLease with keepAliveErr set and keepAliveCh non-nil, starting the keepAliveWorker (or calling the function that consumes KeepAlive) and asserting that the worker's output/monitor channel closes or the worker exits immediately; reference fakeLease, keepAliveErr, KeepAlive, keepAliveCh and keepAliveWorker to locate the code paths to trigger and assert the immediate close/exit behavior.
154-163: Minor: inconsistentrequireusage.Other tests in this file use
re := require.New(t)and callre.True/re.False/etc.. This one passestdirectly through torequireClosedTimeCh, which then callsrequire.False(t, ok). Not a bug — just a stylistic inconsistency inside the file.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/election/lease_test.go` around lines 154 - 163, The test TestLeaseKeepAliveWorkerExitsOnInvalidTTL is inconsistent with the file's require style; change it to use a require.Assertions instance (re := require.New(t)) instead of passing t into requireClosedTimeCh. Either update the test to perform the closed-channel check inline using re.False(ok) after attempting a receive from timeCh, or modify requireClosedTimeCh to accept a *require.Assertions and call re.False(ok) there; reference TestLeaseKeepAliveWorkerExitsOnInvalidTTL, require.New, requireClosedTimeCh, and the use of require.False when making the assertion.pkg/election/lease.go (2)
129-140: Redundant shutdown log messages.Line 129 already defers
"lease keep alive stopped"onKeepAliveexit, and line 181 defers"stop lease keep alive worker"inside the worker. The additional"lease keep alive worker stopped"at line 138 is almost a duplicate of line 181 from the consumer side and adds little diagnostic value beyond what the existing defers provide. Consider dropping it (or converting to a Debug-level log) to reduce noise.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/election/lease.go` around lines 129 - 140, In the KeepAlive method, remove (or demote to Debug) the redundant log.Info call inside the select case that handles timeCh closure—specifically the log line with message "lease keep alive worker stopped" that references l.Purpose—because the method already defers "lease keep alive stopped" at the top and the worker already defers "stop lease keep alive worker"; update the select branch to simply return (or log.Debug instead) to avoid noisy duplicate info-level messages.
172-174: Drop the unusedtime.Durationparameter.The second argument is no longer used now that the worker relies on the etcd keep-alive stream rather than a tick interval. Keeping it as
_is misleading to callers (line 128 passesl.leaseTimeout/3; tests pass arbitrary durations) and invites drift. Consider simplifying the signature and updating callers.♻️ Proposed refactor
-// keepAliveWorker keeps the lease alive through the etcd lease keepalive stream. -func (l *Lease) keepAliveWorker(ctx context.Context, _ time.Duration) <-chan time.Time { +// keepAliveWorker keeps the lease alive through the etcd lease keepalive stream. +func (l *Lease) keepAliveWorker(ctx context.Context) <-chan time.Time {Caller in
KeepAlive:- timeCh := l.keepAliveWorker(ctx, l.leaseTimeout/3) + timeCh := l.keepAliveWorker(ctx)And the test call sites in
lease_test.go(lines 96, 159, 171) would need the duration removed as well.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/election/lease.go` around lines 172 - 174, The keepAliveWorker function currently accepts an unused time.Duration parameter; remove the second parameter from the keepAliveWorker signature (change func (l *Lease) keepAliveWorker(ctx context.Context, _ time.Duration) <-chan time.Time to func (l *Lease) keepAliveWorker(ctx context.Context) <-chan time.Time) and update all callers to drop the duration argument—specifically update the KeepAlive method that calls l.keepAliveWorker(ctx, l.leaseTimeout/3) to call l.keepAliveWorker(ctx) and remove the extra duration arguments from the test call sites in lease_test.go (the lines that currently pass durations). Rebuild tests to ensure no other callers remain and adjust any references to the old signature accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@pkg/election/lease.go`:
- Around line 64-70: The Lease.Grant routine writes l.lease without
synchronization while keepAliveWorker (spawned goroutine) and Close read it,
causing a race; protect accesses by adding a lock or waiter: introduce a
sync.RWMutex (e.g., leaseMu) on the Lease struct and wrap all reads/writes of
l.lease in leaseMu.RLock()/RUnlock() and leaseMu.Lock()/Unlock() in Grant,
Close, and keepAliveWorker, or alternatively add a sync.WaitGroup tracked by
KeepAlive and have Close wait for the worker to finish before Grant can replace
l.lease; update Grant, Close, and keepAliveWorker to use the chosen
synchronization primitive consistently.
---
Nitpick comments:
In `@pkg/election/lease_test.go`:
- Around line 216-247: Add a test that exercises the error branch where
l.lease.KeepAlive returns an error by creating a fakeLease with keepAliveErr set
and keepAliveCh non-nil, starting the keepAliveWorker (or calling the function
that consumes KeepAlive) and asserting that the worker's output/monitor channel
closes or the worker exits immediately; reference fakeLease, keepAliveErr,
KeepAlive, keepAliveCh and keepAliveWorker to locate the code paths to trigger
and assert the immediate close/exit behavior.
- Around line 154-163: The test TestLeaseKeepAliveWorkerExitsOnInvalidTTL is
inconsistent with the file's require style; change it to use a
require.Assertions instance (re := require.New(t)) instead of passing t into
requireClosedTimeCh. Either update the test to perform the closed-channel check
inline using re.False(ok) after attempting a receive from timeCh, or modify
requireClosedTimeCh to accept a *require.Assertions and call re.False(ok) there;
reference TestLeaseKeepAliveWorkerExitsOnInvalidTTL, require.New,
requireClosedTimeCh, and the use of require.False when making the assertion.
In `@pkg/election/lease.go`:
- Around line 129-140: In the KeepAlive method, remove (or demote to Debug) the
redundant log.Info call inside the select case that handles timeCh
closure—specifically the log line with message "lease keep alive worker stopped"
that references l.Purpose—because the method already defers "lease keep alive
stopped" at the top and the worker already defers "stop lease keep alive
worker"; update the select branch to simply return (or log.Debug instead) to
avoid noisy duplicate info-level messages.
- Around line 172-174: The keepAliveWorker function currently accepts an unused
time.Duration parameter; remove the second parameter from the keepAliveWorker
signature (change func (l *Lease) keepAliveWorker(ctx context.Context, _
time.Duration) <-chan time.Time to func (l *Lease) keepAliveWorker(ctx
context.Context) <-chan time.Time) and update all callers to drop the duration
argument—specifically update the KeepAlive method that calls
l.keepAliveWorker(ctx, l.leaseTimeout/3) to call l.keepAliveWorker(ctx) and
remove the extra duration arguments from the test call sites in lease_test.go
(the lines that currently pass durations). Rebuild tests to ensure no other
callers remain and adjust any references to the old signature accordingly.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 662e0d8b-ffe6-4021-a5ab-49259c6e7696
📒 Files selected for processing (2)
pkg/election/lease.gopkg/election/lease_test.go
Signed-off-by: JmPotato <github@ipotato.me>
Signed-off-by: JmPotato <github@ipotato.me>
Signed-off-by: JmPotato <github@ipotato.me>
There was a problem hiding this comment.
🧹 Nitpick comments (2)
pkg/election/lease.go (2)
192-310: LGTM — solid stream-consumer design.Good separation of concerns: single-producer coalesced channel (so the outer loop only ever has to process the latest estimate), explicit
ttl <= 0and channel-closed exits so a lost lease isn't silently masked, and thech <- expiresend is protected byctx.Done()so the worker can't leak if the outer loop has already returned. ThestopReasonfunneled into a single deferred log is a nice touch for post-mortem visibility.One small optional thought: the "drop stale lease keep alive expire time because outer loop is slow" warn (lines 292-297) will emit one warning per keepalive response whenever the outer loop lags by even a single cycle. At the current TTL/3 cadence that's fine, but if you ever tighten the cadence consider gating it behind a "dropped more than N in a row" counter or a rate limiter to avoid log amplification under transient backpressure.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/election/lease.go` around lines 192 - 310, In keepAliveWorker, the "drop stale lease keep alive expire time because outer loop is slow" warn (when reading oldExpire from ch) can spam logs on every keepalive; modify the logic in keepAliveWorker so that instead of unconditionally logging each drop you track consecutive drops (e.g., a droppedInRow counter) or use a simple rate limiter and only emit the warning when drops exceed a threshold (or rate limit allows); update the branch that reads oldExpire from ch and the associated log call to consult that counter/limiter and reset it when a send succeeds so you reduce log amplification while preserving visibility of persistent backpressure.
68-72: Document the sequential Grant/Close contract forl.leasereinitialization.
closedis atomic, but thel.lease = clientv3.NewLease(l.client)reassignment and subsequent reads ofl.leaseinClose()(lines 104, 107) andkeepAliveWorker(line 220) are unsynchronized. In today's callers (Reset()cancels the keepalive ctx beforeClose(), andCampaign()allocates a freshLeaseper run), this is safe, but the invariant is only implicit. A future caller that reuses aLeaseacross overlapping Close/Grant calls, or callsKeepAlive()before the previous worker has observed ctx cancellation, would race on the interface value.Consider either a short godoc on
Grant/Closestating "not safe for concurrent use on the sameLease", or guarding thel.leasehandle with a small mutex /atomic.Pointer[clientv3.Lease]so the reuse contract is enforced by the type.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/election/lease.go` around lines 68 - 72, The code reassigns l.lease without synchronization causing a potential data race between Grant/Close and keepAliveWorker; document the contract or add synchronization. Either add a short godoc on Grant/Close (e.g., on the Lease type or Grant/Close methods) stating "Grant and Close are not safe for concurrent use on the same Lease; callers must serialize Grant/Close/KeepAlive/Reset," or protect the l.lease handle by introducing a small mutex or using atomic.Pointer[clientv3.Lease] and update accesses in Grant, Close, keepAliveWorker, Reset, and any KeepAlive/Grant callers to load/store the lease via that guard so reassignment and reads are synchronized.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@pkg/election/lease.go`:
- Around line 192-310: In keepAliveWorker, the "drop stale lease keep alive
expire time because outer loop is slow" warn (when reading oldExpire from ch)
can spam logs on every keepalive; modify the logic in keepAliveWorker so that
instead of unconditionally logging each drop you track consecutive drops (e.g.,
a droppedInRow counter) or use a simple rate limiter and only emit the warning
when drops exceed a threshold (or rate limit allows); update the branch that
reads oldExpire from ch and the associated log call to consult that
counter/limiter and reset it when a send succeeds so you reduce log
amplification while preserving visibility of persistent backpressure.
- Around line 68-72: The code reassigns l.lease without synchronization causing
a potential data race between Grant/Close and keepAliveWorker; document the
contract or add synchronization. Either add a short godoc on Grant/Close (e.g.,
on the Lease type or Grant/Close methods) stating "Grant and Close are not safe
for concurrent use on the same Lease; callers must serialize
Grant/Close/KeepAlive/Reset," or protect the l.lease handle by introducing a
small mutex or using atomic.Pointer[clientv3.Lease] and update accesses in
Grant, Close, keepAliveWorker, Reset, and any KeepAlive/Grant callers to
load/store the lease via that guard so reassignment and reads are synchronized.
Signed-off-by: JmPotato <github@ipotato.me>
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
pkg/election/lease.go (1)
95-110:⚠️ Potential issue | 🟡 MinorPotential race on
expireTimewith a concurrentKeepAliveloop.
Close()setsexpireTime = ZeroTimeon line 96, but it does not cancel theKeepAliveloop's context. IfKeepAlive()is running whenClose()is called, the outer loop may still process a pending value fromtimeChand overwriteexpireTimewith a non‑zero value at line 160 (thectx.Done()check on line 157 only guards the input ctx, notClose).IsExpired()would then briefly return false afterClose().The window is short (the worker exits as soon as
l.lease.Close()shuts the keepalive stream), and in the election flow the owner is already stepping down, so this is a minor issue rather than a blocker. If you want to make it tight, one option is to also re-storeZeroTimeafterl.lease.Close()has returned, or have the outerKeepAliveloop re-checkl.closed.Load()before storing.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/election/lease.go` around lines 95 - 110, Close() currently stores ZeroTime into l.expireTime but a concurrent KeepAlive() can still write a later non-zero expireTime from timeCh; after l.lease.Close() returns re-store ZeroTime into l.expireTime (or have KeepAlive check l.closed.Load() before writing) to ensure IsExpired() cannot briefly return false post-Close; update the Close() implementation (around the l.expireTime.Store call and after l.lease.Close()) and/or add a l.closed.Load() check in KeepAlive()'s outer loop before assigning to expireTime so the final stored value remains ZeroTime.
🧹 Nitpick comments (1)
pkg/election/lease.go (1)
199-230: Minor: early-exit path produces two log lines for the same event.When
l.lease.KeepAlivefails on line 220, the explicitlog.Warn("lease keep alive stream failed", ...)at lines 223-228 fires, and then the deferred "stop lease keep alive worker" log at lines 213-218 also fires withreason="keepalive_error". Both carry useful context but describe the same failure.Not a correctness issue; consider either suppressing the deferred stop-log on this fast path or dropping the explicit Warn in favor of the deferred one enriched with the error (e.g. by capturing
errin a closure variable). Up to you — leaving as-is is also fine for diagnostics.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/election/lease.go` around lines 199 - 230, The explicit log.Warn after l.lease.KeepAlive duplicates the deferred "stop lease keep alive worker" log; remove the explicit Warn and instead capture the error into a closure variable (e.g., keepAliveErr) and set stopReason = "keepalive_error" so the deferred function (the stop log) emits the failure once and can include the captured error when stopReason == "keepalive_error"; update the deferred log in the goroutine to append the captured error (if any) to its fields.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@pkg/election/lease.go`:
- Around line 95-110: Close() currently stores ZeroTime into l.expireTime but a
concurrent KeepAlive() can still write a later non-zero expireTime from timeCh;
after l.lease.Close() returns re-store ZeroTime into l.expireTime (or have
KeepAlive check l.closed.Load() before writing) to ensure IsExpired() cannot
briefly return false post-Close; update the Close() implementation (around the
l.expireTime.Store call and after l.lease.Close()) and/or add a l.closed.Load()
check in KeepAlive()'s outer loop before assigning to expireTime so the final
stored value remains ZeroTime.
---
Nitpick comments:
In `@pkg/election/lease.go`:
- Around line 199-230: The explicit log.Warn after l.lease.KeepAlive duplicates
the deferred "stop lease keep alive worker" log; remove the explicit Warn and
instead capture the error into a closure variable (e.g., keepAliveErr) and set
stopReason = "keepalive_error" so the deferred function (the stop log) emits the
failure once and can include the captured error when stopReason ==
"keepalive_error"; update the deferred log in the goroutine to append the
captured error (if any) to its fields.
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
pkg/election/lease.go (1)
68-72: Unsynchronizedl.leasefield creates a latent race betweenGrant()andClose().
Grant()reassignsl.leaseat line 71, whileClose()reads it at lines 104 and 107 with no synchronization. More critically,Close()callsl.lease.Close()at line 107 before settingl.closed.Store(true)at line 108. Between these lines, a concurrentGrant()call could observeclosed == false, skip lease recreation, and then attempt to use an already-closed lessor.Current callers follow an ordered pattern—
Reset()cancels the keepalive context before callingClose(), so the KeepAlive goroutine exits before the close happens—preventing concurrent access today. However, this ordering is implicit rather than enforced. Consider storing the lessor in anatomic.Pointer[clientv3.Lease]or guarding with a mutex to make the synchronization contract explicit, especially given the reuse-after-close pattern at lines 70–72 increases the surface area for future misuse.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/election/lease.go` around lines 68 - 72, Grant() currently reassigns l.lease without synchronization while Close() reads and closes l.lease, creating a race; change the lease storage to be synchronized (either use an atomic.Pointer[clientv3.Lease] for l.lease or protect accesses with a mutex) and update Grant(), Close(), and any readers (e.g., Reset(), the KeepAlive goroutine) to load/store the lease via that synchronized mechanism and to set l.closed before closing the lessor so Close() cannot race with Grant(); ensure all places that read or replace l.lease use the chosen atomic/mutex accessor methods to avoid reuse-after-close races.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pkg/election/lease.go`:
- Around line 146-152: Worker exit via the timeCh branch can leave l.expireTime
set to a future value causing IsExpired()/Leadership.Check() to incorrectly
report the lease as valid; in KeepAlive(), when the worker observes definitive
lease-loss signals (e.g., TTL <= 0 or explicit keepalive stream closure detected
before returning from the timeCh receive case), explicitly invalidate the local
estimate by setting l.expireTime = time.Time{} (or another sentinel that
IsExpired recognizes) before returning from KeepAlive(), or alternatively add a
boolean flag on the lease (e.g., l.locallyLost) set in that branch and checked
by IsExpired()/Leadership.Check(); update references to KeepAlive(), timeCh,
l.expireTime, IsExpired(), and Leadership.Check() accordingly so the owner steps
down immediately on the definitive loss path.
---
Nitpick comments:
In `@pkg/election/lease.go`:
- Around line 68-72: Grant() currently reassigns l.lease without synchronization
while Close() reads and closes l.lease, creating a race; change the lease
storage to be synchronized (either use an atomic.Pointer[clientv3.Lease] for
l.lease or protect accesses with a mutex) and update Grant(), Close(), and any
readers (e.g., Reset(), the KeepAlive goroutine) to load/store the lease via
that synchronized mechanism and to set l.closed before closing the lessor so
Close() cannot race with Grant(); ensure all places that read or replace l.lease
use the chosen atomic/mutex accessor methods to avoid reuse-after-close races.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 8b22a3ec-d1a2-46b0-8e89-b6ec6686d944
📒 Files selected for processing (2)
pkg/election/lease.gopkg/election/lease_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
- pkg/election/lease_test.go
| case t, ok := <-timeCh: | ||
| if !ok { | ||
| log.Info("lease keep alive worker stopped", | ||
| zap.String("purpose", l.Purpose), | ||
| zap.Time("actual-expire", l.getExpireTime())) | ||
| return | ||
| } |
There was a problem hiding this comment.
Worker-initiated exit leaves expireTime stale.
On this path (triggered when the worker closes ch due to invalid TTL at line 252 or keepalive stream closure at line 237), KeepAlive() returns immediately while l.expireTime still holds the last successful renewal's value. Unlike the watchdog case below (which only returns once time.Until(actualExpire) <= 0 per lines 175–178), here expireTime can remain in the future for up to leaseTimeout. Subsequent IsExpired()/Leadership.Check() calls will report the lease as still valid until expireTime naturally passes, even though we've already observed definitive signs of lease loss (e.g., TTL <= 0).
The previous ticker-based design only exited via the watchdog, so expireTime was naturally correlated with exit. This is a subtle behavior change introduced by the new early-exit paths.
Consider explicitly invalidating the local estimate on the lease-loss signal so the owner steps down immediately:
🛡️ Proposed fix
case t, ok := <-timeCh:
if !ok {
log.Info("lease keep alive worker stopped",
zap.String("purpose", l.Purpose),
zap.Time("actual-expire", l.getExpireTime()))
+ // Worker signalled a definitive stop (invalid TTL / stream closed).
+ // Clear the local estimate so IsExpired() reports the lease as
+ // expired immediately rather than waiting for natural expiry.
+ l.expireTime.Store(typeutil.ZeroTime)
return
}Alternatively, distinguish invalid-TTL from transient channel closure (the latter may still have a valid server-side lease) and only invalidate on the definitive case.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/election/lease.go` around lines 146 - 152, Worker exit via the timeCh
branch can leave l.expireTime set to a future value causing
IsExpired()/Leadership.Check() to incorrectly report the lease as valid; in
KeepAlive(), when the worker observes definitive lease-loss signals (e.g., TTL
<= 0 or explicit keepalive stream closure detected before returning from the
timeCh receive case), explicitly invalidate the local estimate by setting
l.expireTime = time.Time{} (or another sentinel that IsExpired recognizes)
before returning from KeepAlive(), or alternatively add a boolean flag on the
lease (e.g., l.locallyLost) set in that branch and checked by
IsExpired()/Leadership.Check(); update references to KeepAlive(), timeCh,
l.expireTime, IsExpired(), and Leadership.Check() accordingly so the owner steps
down immediately on the definitive loss path.
Fold keepAliveWorker into Lease.KeepAlive so lease renewal runs in a single goroutine whose select multiplexes ctx cancellation, the etcd keepalive channel, and the leaseTimeout watchdog. The previous worker goroutine and its 1-slot output channel existed only to funnel responses into the outer loop; consuming the etcd channel directly achieves the same thing without the extra hop. - Extract processKeepAliveResponse, loadLeaseID and resetTimer so each helper carries a single responsibility. - Bake purpose and lease-id into a scoped logger (log.L().With) so every keepalive log line carries them without repeating fields at the call site. Existing log strings and field names are preserved for operator alerts. - Document the closed field: Close() permanently halts the etcd lessor, so Grant() rebuilds l.lease on reuse to avoid native KeepAlive returning ErrKeepAliveHalted immediately. - Rewrite tests that referenced keepAliveWorker to exercise Lease.KeepAlive directly, and add TestLeaseKeepAliveKeepsExpireMonotonic to pin the maxExpire guard. Signed-off-by: JmPotato <github@ipotato.me>
- Use t.Context() for tests whose only cancel() was the defer cleanup, and context.WithCancel(t.Context()) where the test still needs to cancel mid-run so the ctx is tied to the test lifecycle. - Replace direct lease.expireTime.Load().(time.Time) accesses with the existing getExpireTime() helper so tests go through the same nil-safe path as production callers. - Drop the ad-hoc Eventually timeouts in favour of the repo-standard testutil.Eventually (20s waitFor / 100ms tick); the custom 1s/10ms and 2s/50ms values only narrowed the failure window and provided no extra signal. Signed-off-by: JmPotato <github@ipotato.me>
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #10618 +/- ##
==========================================
+ Coverage 78.92% 78.98% +0.05%
==========================================
Files 532 532
Lines 71862 71990 +128
==========================================
+ Hits 56720 56860 +140
+ Misses 11114 11101 -13
- Partials 4028 4029 +1
Flags with carried forward coverage won't be shown. Click here to find out more. 🚀 New features to boost your workflow:
|
What problem does this PR solve?
Issue Number: ref #8552
What is changed and how does it work?
Check List
Tests
Release note
Summary by CodeRabbit
Bug Fixes
Tests