diff --git a/docs/docs/operator/advanced-configuration.mdx b/docs/docs/operator/advanced-configuration.mdx index 60f4763c..903f77d5 100644 --- a/docs/docs/operator/advanced-configuration.mdx +++ b/docs/docs/operator/advanced-configuration.mdx @@ -129,6 +129,17 @@ builder.Services retryPeriod: TimeSpan.FromSeconds(2))); ``` +### Behavior on Leadership Loss + +With `LeaderElectionType.Single`, both the resource watcher **and** the reconciliation queue are gated on leadership. When an instance loses its lease, it performs a **hard stop**: the queue processing loop and any in-flight reconciliations are cancelled immediately. This prevents a former leader from continuing to process resources — including timed requeues and error retries — after another instance has taken over. + +Interrupting a running reconciliation is safe because KubeOps follows the standard Kubernetes model: + +- **Optimistic concurrency** — concurrent writes to the same object are serialized by the API server via `resourceVersion`; a stale write fails with HTTP 409 Conflict. +- **Level-triggered, idempotent reconciliation** — a reconciler converges observed state towards desired state, so an interrupted reconciliation is simply re-run by the new leader against the current state. The lease timings (`LeaseDuration > RenewDeadline`) bound the short overlap window during a leadership transition. + +This mirrors the behavior of the wider operator ecosystem. See the [Kubernetes leases documentation](https://kubernetes.io/docs/concepts/architecture/leases/) for details. + ### Custom Leader Election The `Custom` leader election type allows you to implement your own coordination logic, such as namespace-based leader election. @@ -918,4 +929,4 @@ To completely replace how events are published, add your own `IEventPublisherFac - Check queue permissions and quotas when using an external message broker as a reconciliation trigger - Monitor message processing errors in external trigger consumers - Ensure entities still exist before enqueuing — the Kubernetes API is the authoritative source of truth -- If externally triggered reconciliations are not running, confirm the background service is registered and started correctly +- If externally triggered reconciliations are not running, confirm the background service is registered and started correctly \ No newline at end of file diff --git a/src/KubeOps.Operator/Builder/OperatorBuilder.cs b/src/KubeOps.Operator/Builder/OperatorBuilder.cs index bb4e3755..28815f85 100644 --- a/src/KubeOps.Operator/Builder/OperatorBuilder.cs +++ b/src/KubeOps.Operator/Builder/OperatorBuilder.cs @@ -57,7 +57,16 @@ public IOperatorBuilder AddController() if (Settings.QueueStrategy == QueueStrategy.InMemory) { Services.TryAddSingleton, TimedEntityQueue>(); - Services.AddHostedService>(); + + switch (Settings.LeaderElectionType) + { + case LeaderElectionType.None: + Services.AddHostedService>(); + break; + case LeaderElectionType.Single: + Services.AddHostedService>(); + break; + } } // Leader Election diff --git a/src/KubeOps.Operator/Queue/EntityQueueBackgroundService{TEntity}.cs b/src/KubeOps.Operator/Queue/EntityQueueBackgroundService{TEntity}.cs index ee22f2cf..466443da 100644 --- a/src/KubeOps.Operator/Queue/EntityQueueBackgroundService{TEntity}.cs +++ b/src/KubeOps.Operator/Queue/EntityQueueBackgroundService{TEntity}.cs @@ -59,7 +59,7 @@ namespace KubeOps.Operator.Queue; /// unbounded task accumulation. /// /// -internal sealed class EntityQueueBackgroundService( +public class EntityQueueBackgroundService( ActivitySource activitySource, IKubernetesClient client, OperatorSettings operatorSettings, @@ -68,12 +68,12 @@ internal sealed class EntityQueueBackgroundService( ILogger> logger) : IHostedService, IDisposable, IAsyncDisposable where TEntity : IKubernetesObject { - private readonly CancellationTokenSource _cts = new(); private readonly ConcurrentDictionary _uidEntries = new(); private readonly SemaphoreSlim _parallelismSemaphore = new( operatorSettings.ParallelReconciliation.MaxParallelReconciliations, operatorSettings.ParallelReconciliation.MaxParallelReconciliations); + private CancellationTokenSource _cts = new(); private volatile bool _disposed; /// @@ -83,8 +83,16 @@ internal sealed class EntityQueueBackgroundService( /// cancellation is managed via an internal that is signaled by . /// This avoids cancelling the scheduled work during the host startup phase. /// - public Task StartAsync(CancellationToken cancellationToken) + public virtual Task StartAsync(CancellationToken cancellationToken) { + // Re-create the cancellation token source when it was cancelled by a previous StopAsync. + // This allows the processing loop to be restarted (e.g. when leadership is re-acquired). + if (_cts.IsCancellationRequested) + { + _cts.Dispose(); + _cts = new(); + } + // The current implementation of IHostedService expects that StartAsync is "really" asynchronous. // Blocking calls are not allowed, they would stop the rest of the startup flow. // @@ -101,7 +109,7 @@ public Task StartAsync(CancellationToken cancellationToken) } /// - public Task StopAsync(CancellationToken cancellationToken) + public virtual Task StopAsync(CancellationToken cancellationToken) => _disposed ? Task.CompletedTask : _cts.CancelAsync(); @@ -109,23 +117,8 @@ public Task StopAsync(CancellationToken cancellationToken) /// public void Dispose() { - _cts.Dispose(); - _parallelismSemaphore.Dispose(); - - lock (_uidEntries) - { - foreach (var entry in _uidEntries.Values) - { - entry.Semaphore.Dispose(); - } - - _uidEntries.Clear(); - } - - client.Dispose(); - queue.Dispose(); - - _disposed = true; + Dispose(true); + GC.SuppressFinalize(this); } /// @@ -159,7 +152,37 @@ static async ValueTask CastAndDispose(IDisposable resource) } } - private async Task> ReconcileSingleAsync(QueueEntry entry, CancellationToken cancellationToken) + /// + /// Releases the resources used by the background service. + /// + /// Whether the method is called from . + protected virtual void Dispose(bool disposing) + { + if (!disposing || _disposed) + { + return; + } + + _cts.Dispose(); + _parallelismSemaphore.Dispose(); + + lock (_uidEntries) + { + foreach (var entry in _uidEntries.Values) + { + entry.Semaphore.Dispose(); + } + + _uidEntries.Clear(); + } + + client.Dispose(); + queue.Dispose(); + + _disposed = true; + } + + protected virtual async Task> ReconcileSingleAsync(QueueEntry entry, CancellationToken cancellationToken) { logger .LogTrace( diff --git a/src/KubeOps.Operator/Queue/LeaderAwareEntityQueueBackgroundService{TEntity}.cs b/src/KubeOps.Operator/Queue/LeaderAwareEntityQueueBackgroundService{TEntity}.cs new file mode 100644 index 00000000..b50ddeec --- /dev/null +++ b/src/KubeOps.Operator/Queue/LeaderAwareEntityQueueBackgroundService{TEntity}.cs @@ -0,0 +1,120 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information. + +using System.Diagnostics; + +using k8s; +using k8s.LeaderElection; +using k8s.Models; + +using KubeOps.Abstractions.Builder; +using KubeOps.Abstractions.Reconciliation; +using KubeOps.KubernetesClient; + +using Microsoft.Extensions.Logging; + +namespace KubeOps.Operator.Queue; + +/// +/// A leadership-aware variant of . The queue is only +/// consumed while this instance holds leadership; when leadership is lost the processing loop and any +/// in-flight reconciliations are cancelled immediately. +/// +/// The type of the Kubernetes entity being managed. +/// +/// +/// This service deliberately performs a hard stop on leadership loss: cancelling the +/// internal token aborts the dequeue loop as well as any reconciliation that is currently running. This +/// mirrors the behaviour of the wider Kubernetes operator ecosystem — controller-runtime +/// (Kubebuilder / Operator SDK) terminates the whole process when its lease is lost rather than draining +/// work gracefully. +/// +/// +/// Leader election does not guarantee strict mutual exclusion: clock skew, GC pauses or a slow API server +/// can leave an instance acting briefly after its lease has expired. That short transition overlap is +/// expected and is made safe by two properties the SDK already relies on: +/// +/// +/// +/// Optimistic concurrency — concurrent writes to the same object are serialised by the +/// API server via metadata.resourceVersion; a stale write fails with HTTP 409 Conflict. +/// +/// +/// Level-triggered, idempotent reconciliation — a reconciler converges observed state +/// towards desired state, so an interrupted reconciliation is simply re-run by the new leader against the +/// current (possibly partial) state. The lease timing (LeaseDuration > RenewDeadline) bounds the +/// overlap window. +/// +/// +/// +/// References: +/// +/// https://kubernetes.io/docs/concepts/architecture/leases/ +/// https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/manager#Options +/// +/// +/// +public class LeaderAwareEntityQueueBackgroundService( + ActivitySource activitySource, + IKubernetesClient client, + OperatorSettings operatorSettings, + ITimedEntityQueue queue, + IReconciler reconciler, + ILogger> logger, + LeaderElector elector) + : EntityQueueBackgroundService( + activitySource, + client, + operatorSettings, + queue, + reconciler, + logger) + where TEntity : IKubernetesObject +{ + public override Task StartAsync(CancellationToken cancellationToken) + { + logger.LogDebug("Subscribe for leadership updates."); + + elector.OnStartedLeading += StartedLeading; + elector.OnStoppedLeading += StoppedLeading; + + return elector.IsLeader() ? base.StartAsync(cancellationToken) : Task.CompletedTask; + } + + public override Task StopAsync(CancellationToken cancellationToken) + { + logger.LogDebug("Unsubscribe from leadership updates."); + + elector.OnStartedLeading -= StartedLeading; + elector.OnStoppedLeading -= StoppedLeading; + + return elector.IsLeader() ? base.StopAsync(cancellationToken) : Task.CompletedTask; + } + + protected override void Dispose(bool disposing) + { + if (disposing) + { + elector.OnStartedLeading -= StartedLeading; + elector.OnStoppedLeading -= StoppedLeading; + } + + base.Dispose(disposing); + } + + private void StartedLeading() + { + logger.LogInformation("This instance started leading, starting queue processing."); + _ = base.StartAsync(CancellationToken.None); + } + + private void StoppedLeading() + { + logger.LogInformation("This instance stopped leading, stopping queue processing."); + + // Hard stop: cancelling the internal token aborts the dequeue loop and any in-flight + // reconciliation. See the class remarks for why interrupting running reconciliations is safe. + _ = base.StopAsync(CancellationToken.None); + } +} diff --git a/test/KubeOps.Operator.Test/Builder/OperatorBuilderQueueStrategy.Test.cs b/test/KubeOps.Operator.Test/Builder/OperatorBuilderQueueStrategy.Test.cs index 1293a79e..64646285 100644 --- a/test/KubeOps.Operator.Test/Builder/OperatorBuilderQueueStrategy.Test.cs +++ b/test/KubeOps.Operator.Test/Builder/OperatorBuilderQueueStrategy.Test.cs @@ -33,6 +33,26 @@ public void Should_Register_TimedEntityQueue_And_BackgroundService_For_InMemory_ s.ImplementationType == typeof(EntityQueueBackgroundService)); } + [Fact] + public void Should_Register_LeaderAware_BackgroundService_For_Single_LeaderElection() + { + var builder = new OperatorBuilder( + new ServiceCollection(), + new OperatorSettingsBuilder + { + QueueStrategy = QueueStrategy.InMemory, + LeaderElectionType = LeaderElectionType.Single, + }.Build()); + builder.AddController(); + + builder.Services.Should().Contain(s => + s.ServiceType == typeof(IHostedService) && + s.ImplementationType == typeof(LeaderAwareEntityQueueBackgroundService)); + builder.Services.Should().NotContain(s => + s.ServiceType == typeof(IHostedService) && + s.ImplementationType == typeof(EntityQueueBackgroundService)); + } + [Fact] public void Should_Not_Register_TimedEntityQueue_Or_BackgroundService_For_Custom_Strategy() { diff --git a/test/KubeOps.Operator.Test/Queue/LeaderAwareEntityQueueBackgroundService.Test.cs b/test/KubeOps.Operator.Test/Queue/LeaderAwareEntityQueueBackgroundService.Test.cs new file mode 100644 index 00000000..9e424c25 --- /dev/null +++ b/test/KubeOps.Operator.Test/Queue/LeaderAwareEntityQueueBackgroundService.Test.cs @@ -0,0 +1,147 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information. + +using System.Reflection; + +using FluentAssertions; + +using KubeOps.Abstractions.Builder; +using KubeOps.Abstractions.Reconciliation; +using KubeOps.KubernetesClient; +using KubeOps.Operator.Queue; +using KubeOps.Operator.Test.TestEntities; + +using Microsoft.Extensions.Logging; + +using Moq; + +using ILock = k8s.LeaderElection.ILock; +using LeaderElectorType = k8s.LeaderElection.LeaderElector; + +namespace KubeOps.Operator.Test.Queue; + +public sealed class LeaderAwareEntityQueueBackgroundServiceTest +{ + [Fact] + public async Task StartedLeading_Should_Begin_Consuming_Queue() + { + var queue = new CapturingQueue(); + await using var service = CreateService(queue); + + await service.StartAsync(TestContext.Current.CancellationToken); + service.SimulateStartedLeading(); + + await queue.EnumeratorStarted.WaitAsync(TimeSpan.FromSeconds(5), TestContext.Current.CancellationToken); + + queue.CapturedToken.IsCancellationRequested.Should().BeFalse(); + } + + [Fact] + public async Task StoppedLeading_Should_Hard_Stop_Queue_Processing() + { + var queue = new CapturingQueue(); + await using var service = CreateService(queue); + + await service.StartAsync(TestContext.Current.CancellationToken); + service.SimulateStartedLeading(); + await queue.EnumeratorStarted.WaitAsync(TimeSpan.FromSeconds(5), TestContext.Current.CancellationToken); + + service.SimulateStoppedLeading(); + + // Cancelling the internal token must abort the in-flight queue consumption. + await queue.EnumeratorCancelled.WaitAsync(TimeSpan.FromSeconds(5), TestContext.Current.CancellationToken); + queue.CapturedToken.IsCancellationRequested.Should().BeTrue(); + } + + private static TestableService CreateService(CapturingQueue queue) + { + var lockMock = new Mock(); + lockMock + .Setup(l => l.GetAsync(It.IsAny())) + .Returns(async ct => { await Task.Delay(Timeout.Infinite, ct); return null!; }); + + var elector = new LeaderElectorType(new(lockMock.Object) + { + LeaseDuration = TimeSpan.FromSeconds(1), + RenewDeadline = TimeSpan.FromMilliseconds(500), + RetryPeriod = TimeSpan.FromMilliseconds(100), + }); + + return new(queue, elector); + } + + /// + /// Exposes the private leadership callbacks for testing by invoking the delegates registered on the + /// elector, mirroring the approach used in the leader-aware resource watcher tests. + /// + private sealed class TestableService : LeaderAwareEntityQueueBackgroundService + { + private readonly LeaderElectorType _elector; + + public TestableService(CapturingQueue queue, LeaderElectorType elector) + : base( + new("test"), + Mock.Of(), + new OperatorSettingsBuilder { Namespace = "unit-test" }.Build(), + queue, + Mock.Of>(), + Mock.Of>>(), + elector) + { + _elector = elector; + } + + public void SimulateStartedLeading() => InvokeElectorEvent(nameof(LeaderElectorType.OnStartedLeading)); + + public void SimulateStoppedLeading() => InvokeElectorEvent(nameof(LeaderElectorType.OnStoppedLeading)); + + private void InvokeElectorEvent(string eventName) + { + var field = typeof(LeaderElectorType) + .GetField(eventName, BindingFlags.Instance | BindingFlags.NonPublic); + var handler = (Action?)field?.GetValue(_elector); + handler?.Invoke(); + } + } + + private sealed class CapturingQueue : ITimedEntityQueue + { + private readonly TaskCompletionSource _enumeratorStarted = new(TaskCreationOptions.RunContinuationsAsynchronously); + private readonly TaskCompletionSource _enumeratorCancelled = new(TaskCreationOptions.RunContinuationsAsynchronously); + + public CancellationToken CapturedToken { get; private set; } + + public Task EnumeratorStarted => _enumeratorStarted.Task; + + public Task EnumeratorCancelled => _enumeratorCancelled.Task; + + public Task Enqueue( + V1OperatorIntegrationTestEntity entity, + ReconciliationType type, + ReconciliationTriggerSource reconciliationTriggerSource, + TimeSpan queueIn, + int retryCount, + CancellationToken cancellationToken) => Task.CompletedTask; + + public async IAsyncEnumerator> GetAsyncEnumerator( + CancellationToken cancellationToken = default) + { + CapturedToken = cancellationToken; + var blocker = new TaskCompletionSource(); + await using var registration = cancellationToken.Register(() => + { + blocker.TrySetResult(); + _enumeratorCancelled.TrySetResult(); + }); + + _enumeratorStarted.TrySetResult(); + await blocker.Task; + yield break; + } + + public void Dispose() + { + } + } +}