Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion docs/docs/operator/advanced-configuration.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,17 @@ builder.Services
retryPeriod: TimeSpan.FromSeconds(2)));
```

### Behavior on Leadership Loss

With `LeaderElectionType.Single`, both the resource watcher **and** the reconciliation queue are gated on leadership. When an instance loses its lease, it performs a **hard stop**: the queue processing loop and any in-flight reconciliations are cancelled immediately. This prevents a former leader from continuing to process resources — including timed requeues and error retries — after another instance has taken over.

Interrupting a running reconciliation is safe because KubeOps follows the standard Kubernetes model:

- **Optimistic concurrency** — concurrent writes to the same object are serialized by the API server via `resourceVersion`; a stale write fails with HTTP 409 Conflict.
- **Level-triggered, idempotent reconciliation** — a reconciler converges observed state towards desired state, so an interrupted reconciliation is simply re-run by the new leader against the current state. The lease timings (`LeaseDuration > RenewDeadline`) bound the short overlap window during a leadership transition.

This mirrors the behavior of the wider operator ecosystem. See the [Kubernetes leases documentation](https://kubernetes.io/docs/concepts/architecture/leases/) for details.

### Custom Leader Election

The `Custom` leader election type allows you to implement your own coordination logic, such as namespace-based leader election.
Expand Down Expand Up @@ -918,4 +929,4 @@ To completely replace how events are published, add your own `IEventPublisherFac
- Check queue permissions and quotas when using an external message broker as a reconciliation trigger
- Monitor message processing errors in external trigger consumers
- Ensure entities still exist before enqueuing — the Kubernetes API is the authoritative source of truth
- If externally triggered reconciliations are not running, confirm the background service is registered and started correctly
- If externally triggered reconciliations are not running, confirm the background service is registered and started correctly
11 changes: 10 additions & 1 deletion src/KubeOps.Operator/Builder/OperatorBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,16 @@ public IOperatorBuilder AddController<TImplementation, TEntity>()
if (Settings.QueueStrategy == QueueStrategy.InMemory)
{
Services.TryAddSingleton<ITimedEntityQueue<TEntity>, TimedEntityQueue<TEntity>>();
Services.AddHostedService<EntityQueueBackgroundService<TEntity>>();

switch (Settings.LeaderElectionType)
{
case LeaderElectionType.None:
Services.AddHostedService<EntityQueueBackgroundService<TEntity>>();
break;
case LeaderElectionType.Single:
Services.AddHostedService<LeaderAwareEntityQueueBackgroundService<TEntity>>();
break;
}
}

// Leader Election
Expand Down
67 changes: 45 additions & 22 deletions src/KubeOps.Operator/Queue/EntityQueueBackgroundService{TEntity}.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ namespace KubeOps.Operator.Queue;
/// unbounded task accumulation.
/// </para>
/// </remarks>
internal sealed class EntityQueueBackgroundService<TEntity>(
public class EntityQueueBackgroundService<TEntity>(
ActivitySource activitySource,
IKubernetesClient client,
OperatorSettings operatorSettings,
Expand All @@ -68,12 +68,12 @@ internal sealed class EntityQueueBackgroundService<TEntity>(
ILogger<EntityQueueBackgroundService<TEntity>> logger) : IHostedService, IDisposable, IAsyncDisposable
where TEntity : IKubernetesObject<V1ObjectMeta>
{
private readonly CancellationTokenSource _cts = new();
private readonly ConcurrentDictionary<string, UidEntry> _uidEntries = new();
private readonly SemaphoreSlim _parallelismSemaphore = new(
operatorSettings.ParallelReconciliation.MaxParallelReconciliations,
operatorSettings.ParallelReconciliation.MaxParallelReconciliations);

private CancellationTokenSource _cts = new();
private volatile bool _disposed;

/// <inheritdoc cref="IHostedService.StartAsync"/>
Expand All @@ -83,8 +83,16 @@ internal sealed class EntityQueueBackgroundService<TEntity>(
/// cancellation is managed via an internal <see cref="CancellationTokenSource"/> that is signaled by <see cref="StopAsync"/>.
/// This avoids cancelling the scheduled work during the host startup phase.
/// </remarks>
public Task StartAsync(CancellationToken cancellationToken)
public virtual Task StartAsync(CancellationToken cancellationToken)
{
// Re-create the cancellation token source when it was cancelled by a previous StopAsync.
// This allows the processing loop to be restarted (e.g. when leadership is re-acquired).
if (_cts.IsCancellationRequested)
{
_cts.Dispose();
_cts = new();
}

// The current implementation of IHostedService expects that StartAsync is "really" asynchronous.
// Blocking calls are not allowed, they would stop the rest of the startup flow.
//
Expand All @@ -101,31 +109,16 @@ public Task StartAsync(CancellationToken cancellationToken)
}

/// <inheritdoc/>
public Task StopAsync(CancellationToken cancellationToken)
public virtual Task StopAsync(CancellationToken cancellationToken)
=> _disposed
? Task.CompletedTask
: _cts.CancelAsync();

/// <inheritdoc/>
public void Dispose()
{
_cts.Dispose();
_parallelismSemaphore.Dispose();

lock (_uidEntries)
{
foreach (var entry in _uidEntries.Values)
{
entry.Semaphore.Dispose();
}

_uidEntries.Clear();
}

client.Dispose();
queue.Dispose();

_disposed = true;
Dispose(true);
GC.SuppressFinalize(this);
}

/// <inheritdoc/>
Expand Down Expand Up @@ -159,7 +152,37 @@ static async ValueTask CastAndDispose(IDisposable resource)
}
}

private async Task<ReconciliationResult<TEntity>> ReconcileSingleAsync(QueueEntry<TEntity> entry, CancellationToken cancellationToken)
/// <summary>
/// Releases the resources used by the background service.
/// </summary>
/// <param name="disposing">Whether the method is called from <see cref="Dispose()"/>.</param>
protected virtual void Dispose(bool disposing)
{
if (!disposing || _disposed)
{
return;
}

_cts.Dispose();
_parallelismSemaphore.Dispose();

lock (_uidEntries)
{
foreach (var entry in _uidEntries.Values)
{
entry.Semaphore.Dispose();
}

_uidEntries.Clear();
}

client.Dispose();
queue.Dispose();

_disposed = true;
}

protected virtual async Task<ReconciliationResult<TEntity>> ReconcileSingleAsync(QueueEntry<TEntity> entry, CancellationToken cancellationToken)
{
logger
.LogTrace(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.

using System.Diagnostics;

using k8s;
using k8s.LeaderElection;
using k8s.Models;

using KubeOps.Abstractions.Builder;
using KubeOps.Abstractions.Reconciliation;
using KubeOps.KubernetesClient;

using Microsoft.Extensions.Logging;

namespace KubeOps.Operator.Queue;

/// <summary>
/// A leadership-aware variant of <see cref="EntityQueueBackgroundService{TEntity}"/>. The queue is only
/// consumed while this instance holds leadership; when leadership is lost the processing loop and any
/// in-flight reconciliations are cancelled immediately.
/// </summary>
/// <typeparam name="TEntity">The type of the Kubernetes entity being managed.</typeparam>
/// <remarks>
/// <para>
/// This service deliberately performs a <strong>hard stop</strong> on leadership loss: cancelling the
/// internal token aborts the dequeue loop as well as any reconciliation that is currently running. This
/// mirrors the behaviour of the wider Kubernetes operator ecosystem — controller-runtime
/// (Kubebuilder / Operator SDK) terminates the whole process when its lease is lost rather than draining
/// work gracefully.
/// </para>
/// <para>
/// Leader election does not guarantee strict mutual exclusion: clock skew, GC pauses or a slow API server
/// can leave an instance acting briefly after its lease has expired. That short transition overlap is
/// expected and is made safe by two properties the SDK already relies on:
/// </para>
/// <list type="bullet">
/// <item><description>
/// <strong>Optimistic concurrency</strong> — concurrent writes to the same object are serialised by the
/// API server via <c>metadata.resourceVersion</c>; a stale write fails with HTTP 409 Conflict.
/// </description></item>
/// <item><description>
/// <strong>Level-triggered, idempotent reconciliation</strong> — a reconciler converges observed state
/// towards desired state, so an interrupted reconciliation is simply re-run by the new leader against the
/// current (possibly partial) state. The lease timing (<c>LeaseDuration &gt; RenewDeadline</c>) bounds the
/// overlap window.
/// </description></item>
/// </list>
/// <para>
/// References:
/// <list type="bullet">
/// <item><description>https://kubernetes.io/docs/concepts/architecture/leases/</description></item>
/// <item><description>https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/manager#Options</description></item>
/// </list>
/// </para>
/// </remarks>
public class LeaderAwareEntityQueueBackgroundService<TEntity>(
ActivitySource activitySource,
IKubernetesClient client,
OperatorSettings operatorSettings,
ITimedEntityQueue<TEntity> queue,
IReconciler<TEntity> reconciler,
ILogger<LeaderAwareEntityQueueBackgroundService<TEntity>> logger,
LeaderElector elector)
: EntityQueueBackgroundService<TEntity>(
activitySource,
client,
operatorSettings,
queue,
reconciler,
logger)
where TEntity : IKubernetesObject<V1ObjectMeta>
{
public override Task StartAsync(CancellationToken cancellationToken)
{
logger.LogDebug("Subscribe for leadership updates.");

elector.OnStartedLeading += StartedLeading;
elector.OnStoppedLeading += StoppedLeading;

return elector.IsLeader() ? base.StartAsync(cancellationToken) : Task.CompletedTask;
}

public override Task StopAsync(CancellationToken cancellationToken)
{
logger.LogDebug("Unsubscribe from leadership updates.");

elector.OnStartedLeading -= StartedLeading;
elector.OnStoppedLeading -= StoppedLeading;

return elector.IsLeader() ? base.StopAsync(cancellationToken) : Task.CompletedTask;
}

protected override void Dispose(bool disposing)
{
if (disposing)
{
elector.OnStartedLeading -= StartedLeading;
elector.OnStoppedLeading -= StoppedLeading;
}

base.Dispose(disposing);
}

private void StartedLeading()
{
logger.LogInformation("This instance started leading, starting queue processing.");
_ = base.StartAsync(CancellationToken.None);
}

private void StoppedLeading()
{
logger.LogInformation("This instance stopped leading, stopping queue processing.");

// Hard stop: cancelling the internal token aborts the dequeue loop and any in-flight
// reconciliation. See the class remarks for why interrupting running reconciliations is safe.
_ = base.StopAsync(CancellationToken.None);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,26 @@ public void Should_Register_TimedEntityQueue_And_BackgroundService_For_InMemory_
s.ImplementationType == typeof(EntityQueueBackgroundService<V1OperatorIntegrationTestEntity>));
}

[Fact]
public void Should_Register_LeaderAware_BackgroundService_For_Single_LeaderElection()
{
var builder = new OperatorBuilder(
new ServiceCollection(),
new OperatorSettingsBuilder
{
QueueStrategy = QueueStrategy.InMemory,
LeaderElectionType = LeaderElectionType.Single,
}.Build());
builder.AddController<TestController, V1OperatorIntegrationTestEntity>();

builder.Services.Should().Contain(s =>
s.ServiceType == typeof(IHostedService) &&
s.ImplementationType == typeof(LeaderAwareEntityQueueBackgroundService<V1OperatorIntegrationTestEntity>));
builder.Services.Should().NotContain(s =>
s.ServiceType == typeof(IHostedService) &&
s.ImplementationType == typeof(EntityQueueBackgroundService<V1OperatorIntegrationTestEntity>));
}

[Fact]
public void Should_Not_Register_TimedEntityQueue_Or_BackgroundService_For_Custom_Strategy()
{
Expand Down
Loading
Loading