diff --git a/KubeOps.slnx b/KubeOps.slnx index 8b63d967..7eea7962 100644 --- a/KubeOps.slnx +++ b/KubeOps.slnx @@ -4,6 +4,7 @@ + diff --git a/README.md b/README.md index e579aa07..5d986488 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,7 @@ The documentation is also provided within the code itself (description of method - **Code Generation:** Includes Roslyn source generators and a CLI tool (`kubeops`) to automate boilerplate code for CRDs, controllers, and RBAC rules. - **Enhanced Kubernetes Client:** Provides convenience methods built on top of the official client library. - **Leader Election:** Automatic handling for high-availability operator deployments. +- **Metrics:** Built-in [OpenTelemetry](https://opentelemetry.io/) metrics for the reconciliation queue, reconciler, and watchers, exportable via any OpenTelemetry exporter. ## Getting Started diff --git a/docs/docs/operator/logging.mdx b/docs/docs/operator/logging.mdx index fb0d6ab6..569a9ffc 100644 --- a/docs/docs/operator/logging.mdx +++ b/docs/docs/operator/logging.mdx @@ -85,6 +85,11 @@ To enable scopes with OpenTelemetry, configure it as follows: The scope state must be an `IReadOnlyDictionary` to ensure correct serialization and inclusion in log entries. ::: +:::tip +Besides tracing, the operator also emits OpenTelemetry **metrics** for its reconciliation pipeline +(queue depth, reconciliation count/duration, watch events, and more). See [Metrics](./metrics). +::: + ## Tracing with `System.Diagnostics` and `ActivitySource` For [distributed tracing](https://learn.microsoft.com/en-us/dotnet/core/diagnostics/distributed-tracing-concepts), this project uses `System.Diagnostics` in combination with `ActivitySource`. diff --git a/docs/docs/operator/metrics.mdx b/docs/docs/operator/metrics.mdx new file mode 100644 index 00000000..7bd516c0 --- /dev/null +++ b/docs/docs/operator/metrics.mdx @@ -0,0 +1,147 @@ +--- +title: Metrics +description: OpenTelemetry metrics for the operator pipeline +sidebar_position: 6.5 +--- + +# Metrics + +KubeOps emits [OpenTelemetry](https://opentelemetry.io/) metrics for its reconciliation pipeline +through a [`Meter`](https://learn.microsoft.com/dotnet/core/diagnostics/metrics) named after the +operator (`OperatorSettings.Name`) — the same identifier used for the tracing `ActivitySource`. + +Collecting metrics is enabled by default and is virtually free when no exporter is attached. To +actually scrape the data you register an OpenTelemetry exporter for the meter. + +## Enabling / disabling + +Metrics collection is controlled by `OperatorSettings.EnableMetrics` (default `true`). Disable it via +the fluent builder: + +```csharp +builder.Services.AddKubernetesOperator(settings => settings + .WithMetrics(false)); +``` + +When disabled, the metrics infrastructure is not registered and the instrumentation in the watcher, +queue, and reconciler is skipped entirely. + +## Instruments + +All instruments carry a `kubeops.entity.type` tag (the watched entity's type name, e.g. `V1MyResource`). + +| Name | Type | Unit | Additional tags | +|------|------|------|-----------------| +| `kubeops.operator.queue.depth` | ObservableGauge | `{items}` | `kubeops.queue.state` (`scheduled` \| `ready`) | +| `kubeops.operator.queue.enqueued` | Counter | `{items}` | `kubeops.trigger.source` (`api_server` \| `operator`) | +| `kubeops.operator.queue.requeued` | Counter | `{items}` | `kubeops.requeue.reason` (`conflict` \| `error_retry` \| `operator_requeue`) | +| `kubeops.operator.queue.discarded` | Counter | `{items}` | — | +| `kubeops.operator.reconciliation` | Counter | `{reconciliations}` | `kubeops.reconciliation.type` (`added` \| `modified` \| `deleted`), `kubeops.reconciliation.status` (`success` \| `failure`), `error.type` (on failure) | +| `kubeops.operator.reconciliation.duration` | Histogram | `s` | `kubeops.reconciliation.type`, `kubeops.reconciliation.status`, `error.type` (on failure) | +| `kubeops.operator.watcher.events` | Counter | `{events}` | `kubeops.watcher.event.type` (`added` \| `modified` \| `deleted` \| `bookmark`) | +| `kubeops.operator.watcher.reconnections` | Counter | `{reconnections}` | — | + +The `kubeops.operator.queue.depth` gauge reports two series: `scheduled` (entries waiting for a delayed +requeue) and `ready` (entries waiting to be picked up by the reconciliation loop). + +:::note +`kubeops.operator.queue.requeued` is a **subset** of `kubeops.operator.queue.enqueued`: every requeue (conflict, +error-retry, or operator requeue) also increments the enqueued counter. Do not add the two together +when building dashboards — use `requeued` for the per-reason breakdown of requeues only. + +The `kubeops.trigger.source` tag on `kubeops.operator.queue.enqueued` reflects the *original* event source. An +error-retry therefore keeps its original source (e.g. `api_server`) rather than `operator`; use +`kubeops.operator.queue.requeued{kubeops.requeue.reason="error_retry"}` to count retries explicitly. +::: + +:::note +The queue runs side-by-side with the watcher rather than strictly in front of the reconciler, so the +queue instruments give a good — but not exhaustive — view of throughput. See +[issue #1037](https://github.com/dotnet/dotnet-operator-sdk/issues/1037) for context. +::: + +The `error.type` attribute is only present on **failed** reconciliations and carries the failing +exception's full type name (or `_OTHER` when a reconciliation reports failure without an exception). +It follows the OpenTelemetry `error.type` convention and is bounded by the set of exception types your +controllers throw. + +The `kubeops.operator.reconciliation.duration` histogram uses second-scale bucket boundaries +(`5ms … 60s`) tuned for typical reconcile latencies, so `histogram_quantile()` over +`kubeops_operator_reconciliation_duration_seconds_bucket` yields meaningful percentiles out of the box. + +### Prometheus exposition names + +The instrument names above are the OpenTelemetry names. The Prometheus exporter translates them +(dots → underscores, `_total` suffix for counters, unit suffix for the histogram, UCUM annotation +units such as `{items}` dropped). The scrape endpoint therefore exposes: + +| OpenTelemetry instrument | Prometheus time series | +|---|---| +| `kubeops.operator.queue.depth` | `kubeops_operator_queue_depth` | +| `kubeops.operator.queue.enqueued` | `kubeops_operator_queue_enqueued_total` | +| `kubeops.operator.queue.requeued` | `kubeops_operator_queue_requeued_total` | +| `kubeops.operator.queue.discarded` | `kubeops_operator_queue_discarded_total` | +| `kubeops.operator.reconciliation` | `kubeops_operator_reconciliation_total` | +| `kubeops.operator.reconciliation.duration` | `kubeops_operator_reconciliation_duration_seconds` (`_bucket` / `_sum` / `_count`) | +| `kubeops.operator.watcher.events` | `kubeops_operator_watcher_events_total` | +| `kubeops.operator.watcher.reconnections` | `kubeops_operator_watcher_reconnections_total` | + +## Exposing a Prometheus endpoint (KubeOps.Operator.Web) + +Metrics export is configured through the standard OpenTelemetry pipeline, separate from the operator +registration chain. `KubeOps.Operator.Web` provides two helpers: `AddKubeOpsInstrumentation()` on the +`MeterProviderBuilder` subscribes to the operator's meter (the operator name is resolved from the +registered `OperatorSettings`, so you don't have to repeat it), and `MapOperatorMetricsEndpoint()` +exposes the Prometheus scraping endpoint: + +```csharp +var builder = WebApplication.CreateBuilder(args); + +builder.Services + .AddKubernetesOperator() + .RegisterComponents(); + +// NuGet: OpenTelemetry.Extensions.Hosting +builder.Services + .AddOpenTelemetry() + .WithMetrics(m => m + .AddKubeOpsInstrumentation() // subscribes to the operator meter + .AddPrometheusExporter()); + +var app = builder.Build(); +app.UseRouting(); +app.MapControllers(); +app.MapOperatorMetricsEndpoint(); // exposes GET /metrics + +app.Run(); +``` + +Pass the name explicitly with `AddKubeOpsInstrumentation(operatorName)` if `AddKubernetesOperator()` +has not run yet on the same service collection. + +## Manual exporter configuration + +Without `KubeOps.Operator.Web` you can register any OpenTelemetry exporter yourself. Add the meter by +the operator name (`== OperatorSettings.Name`) and pick an exporter: + +```csharp +// Standalone HttpListener (no ASP.NET Core) +// NuGet: OpenTelemetry.Exporter.Prometheus.HttpListener +.WithMetrics(m => m + .AddMeter(operatorName) + .AddPrometheusHttpListener(o => o.UriPrefixes = ["http://+:9464/"])); +// 9464 is the Prometheus convention for the metrics scrape port. +``` + +```csharp +// OTLP to an OpenTelemetry Collector +// NuGet: OpenTelemetry.Exporter.OpenTelemetryProtocol +.WithMetrics(m => m + .AddMeter(operatorName) + .AddOtlpExporter()); +``` + +:::tip +If you already use [.NET Aspire](./aspire) via `KubeOps.Aspire`, the meter is picked up automatically +by `AddKubeOpsServiceDefaults`, which configures OpenTelemetry with OTLP export. +::: diff --git a/examples/OtelOperator/Controller/V1OtelDemoEntityController.cs b/examples/OtelOperator/Controller/V1OtelDemoEntityController.cs new file mode 100644 index 00000000..b91da7e0 --- /dev/null +++ b/examples/OtelOperator/Controller/V1OtelDemoEntityController.cs @@ -0,0 +1,32 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information. + +using k8s.Models; + +using KubeOps.Abstractions.Rbac; +using KubeOps.Abstractions.Reconciliation; +using KubeOps.Abstractions.Reconciliation.Controller; + +using OtelOperator.Entities; + +namespace OtelOperator.Controller; + +[EntityRbac(typeof(V1OtelDemoEntity), Verbs = RbacVerb.All)] +public sealed class V1OtelDemoEntityController(ILogger logger) + : IEntityController +{ + public Task> ReconcileAsync( + V1OtelDemoEntity entity, CancellationToken cancellationToken) + { + logger.LogInformation("Reconciling entity {Namespace}/{Name}.", entity.Namespace(), entity.Name()); + return Task.FromResult(ReconciliationResult.Success(entity)); + } + + public Task> DeletedAsync( + V1OtelDemoEntity entity, CancellationToken cancellationToken) + { + logger.LogInformation("Deleted entity {Namespace}/{Name}.", entity.Namespace(), entity.Name()); + return Task.FromResult(ReconciliationResult.Success(entity)); + } +} diff --git a/examples/OtelOperator/Entities/V1OtelDemoEntity.cs b/examples/OtelOperator/Entities/V1OtelDemoEntity.cs new file mode 100644 index 00000000..a1df1482 --- /dev/null +++ b/examples/OtelOperator/Entities/V1OtelDemoEntity.cs @@ -0,0 +1,18 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information. + +using k8s.Models; + +using KubeOps.Abstractions.Entities; + +namespace OtelOperator.Entities; + +[KubernetesEntity(Group = "testing.dev", ApiVersion = "v1", Kind = "OtelDemoEntity")] +public sealed partial class V1OtelDemoEntity : CustomKubernetesEntity +{ + public sealed class EntitySpec + { + public string Message { get; set; } = string.Empty; + } +} diff --git a/examples/OtelOperator/OtelOperator.csproj b/examples/OtelOperator/OtelOperator.csproj new file mode 100644 index 00000000..97d5daad --- /dev/null +++ b/examples/OtelOperator/OtelOperator.csproj @@ -0,0 +1,22 @@ + + + + net9.0 + enable + enable + false + + + + + + + + + + + + + diff --git a/examples/OtelOperator/Program.cs b/examples/OtelOperator/Program.cs new file mode 100644 index 00000000..3f92b516 --- /dev/null +++ b/examples/OtelOperator/Program.cs @@ -0,0 +1,48 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information. + +using KubeOps.Abstractions.Builder; +using KubeOps.Operator; +using KubeOps.Operator.Web.Builder; + +using OpenTelemetry; +using OpenTelemetry.Metrics; +using OpenTelemetry.Trace; + +const string operatorName = "otel-operator"; + +var builder = WebApplication.CreateBuilder(args); + +// Operator registration only contains operator building blocks. +builder.Services + .AddKubernetesOperator(settings => settings.WithName(operatorName)) + .RegisterComponents(); + +// Observability is wired up separately through the standard OpenTelemetry pipeline. +// AddKubeOpsInstrumentation() subscribes to the operator's Meter and AddSource() to its +// ActivitySource (both named after OperatorSettings.Name). UseOtlpExporter() then exports +// every signal (metrics and traces) to an OTLP collector in a single, global call - +// configure the endpoint via the OTEL_EXPORTER_OTLP_ENDPOINT environment variable. +builder.Services + .AddOpenTelemetry() + .WithMetrics(metrics => metrics + .AddKubeOpsInstrumentation()) + .WithTracing(tracing => tracing + .AddSource(operatorName)) + .UseOtlpExporter(); + +var app = builder.Build(); + +app.UseRouting(); + +// Alternative to OTLP: expose a Prometheus scraping endpoint instead. Swap the metrics +// exporter for the Prometheus one and map the endpoint: +// +// .WithMetrics(metrics => metrics +// .AddKubeOpsInstrumentation() +// .AddPrometheusExporter()) +// +// app.MapOperatorMetricsEndpoint(); // exposes GET /metrics + +await app.RunAsync(); diff --git a/renovate.json b/renovate.json index 82413cdb..0a558e47 100644 --- a/renovate.json +++ b/renovate.json @@ -71,6 +71,13 @@ "matchPackageNames": ["Aspire.Hosting.Kubernetes"], "ignoreUnstable": false, "respectLatest": false + }, + { + "description": ["OpenTelemetry.Exporter.Prometheus.AspNetCore only ships prerelease versions, so allow unstable updates for it."], + "matchManagers": ["nuget"], + "matchPackageNames": ["OpenTelemetry.Exporter.Prometheus.AspNetCore"], + "ignoreUnstable": false, + "respectLatest": false } ] } \ No newline at end of file diff --git a/src/KubeOps.Abstractions/Builder/OperatorSettings.cs b/src/KubeOps.Abstractions/Builder/OperatorSettings.cs index 75560483..e1db2524 100644 --- a/src/KubeOps.Abstractions/Builder/OperatorSettings.cs +++ b/src/KubeOps.Abstractions/Builder/OperatorSettings.cs @@ -117,4 +117,15 @@ public sealed record OperatorSettings /// /// public required ParallelReconciliationSettings ParallelReconciliation { get; init; } + + /// + /// Indicates whether the operator collects OpenTelemetry metrics (queue, watcher, and + /// reconciliation instruments) via a named + /// after . + /// + /// + /// Collecting metrics is virtually free when no listener/exporter is attached. To actually + /// scrape the metrics, register an OpenTelemetry exporter for the meter named . + /// + public required bool EnableMetrics { get; init; } } diff --git a/src/KubeOps.Abstractions/Builder/OperatorSettingsBuilder.cs b/src/KubeOps.Abstractions/Builder/OperatorSettingsBuilder.cs index 067b078b..369bdfc4 100644 --- a/src/KubeOps.Abstractions/Builder/OperatorSettingsBuilder.cs +++ b/src/KubeOps.Abstractions/Builder/OperatorSettingsBuilder.cs @@ -106,6 +106,13 @@ public sealed partial class OperatorSettingsBuilder /// public ParallelReconciliationSettingsBuilder ParallelReconciliation { get; set; } = new(); + /// + /// Indicates whether the operator collects OpenTelemetry metrics. Defaults to true. + /// Collecting is virtually free without an attached listener/exporter; set to false to + /// skip registering the metrics infrastructure entirely. + /// + public bool EnableMetrics { get; set; } = true; + /// /// Produces an immutable record from the current configuration. /// @@ -124,6 +131,7 @@ public sealed partial class OperatorSettingsBuilder AutoDetachFinalizers = AutoDetachFinalizers, ReconcileStrategy = ReconcileStrategy, ParallelReconciliation = ParallelReconciliation.Build(), + EnableMetrics = EnableMetrics, }; [GeneratedRegex(@"(\W|_)", RegexOptions.CultureInvariant)] diff --git a/src/KubeOps.Abstractions/Builder/OperatorSettingsBuilderExtensions.cs b/src/KubeOps.Abstractions/Builder/OperatorSettingsBuilderExtensions.cs index 32c5e070..4f085fb6 100644 --- a/src/KubeOps.Abstractions/Builder/OperatorSettingsBuilderExtensions.cs +++ b/src/KubeOps.Abstractions/Builder/OperatorSettingsBuilderExtensions.cs @@ -150,6 +150,17 @@ public static OperatorSettingsBuilder WithReconcileStrategy( return builder; } + /// Sets whether the operator collects OpenTelemetry metrics. + /// The builder to configure. + /// true to collect metrics (default); false to disable. + /// The same instance for chaining. + public static OperatorSettingsBuilder WithMetrics( + this OperatorSettingsBuilder builder, bool value = true) + { + builder.EnableMetrics = value; + return builder; + } + /// Configures parallel reconciliation settings inline via a delegate. /// The builder to configure. /// An action that configures the . diff --git a/src/KubeOps.Aspire.Hosting/KubeOps.Aspire.Hosting.csproj b/src/KubeOps.Aspire.Hosting/KubeOps.Aspire.Hosting.csproj index 9f5d260a..2a959e4a 100644 --- a/src/KubeOps.Aspire.Hosting/KubeOps.Aspire.Hosting.csproj +++ b/src/KubeOps.Aspire.Hosting/KubeOps.Aspire.Hosting.csproj @@ -2,9 +2,6 @@ net8.0;net9.0;net10.0 - - $(NoWarn);NU5104 @@ -20,7 +17,9 @@ - + + diff --git a/src/KubeOps.Aspire/KubeOpsServiceDefaultsExtensions.cs b/src/KubeOps.Aspire/KubeOpsServiceDefaultsExtensions.cs index 839bc4dd..df88eb97 100644 --- a/src/KubeOps.Aspire/KubeOpsServiceDefaultsExtensions.cs +++ b/src/KubeOps.Aspire/KubeOpsServiceDefaultsExtensions.cs @@ -93,7 +93,11 @@ public static TBuilder ConfigureKubeOpsOpenTelemetry(this TBuilder bui .ConfigureResource(resource => resource.AddService(serviceName)) .WithMetrics(metrics => metrics .AddRuntimeInstrumentation() - .AddHttpClientInstrumentation()) + .AddHttpClientInstrumentation() + + // KubeOps records its operator metrics on a Meter named after the operator + // (OperatorSettings.Name); subscribe to it so they flow through Aspire/OTLP. + .AddMeter(serviceName)) .WithTracing(tracing => tracing .AddHttpClientInstrumentation() diff --git a/src/KubeOps.Operator.Web/Builder/MetricsExtensions.cs b/src/KubeOps.Operator.Web/Builder/MetricsExtensions.cs new file mode 100644 index 00000000..2fe2980f --- /dev/null +++ b/src/KubeOps.Operator.Web/Builder/MetricsExtensions.cs @@ -0,0 +1,71 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information. + +using KubeOps.Abstractions.Builder; + +using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.Routing; +using Microsoft.Extensions.DependencyInjection; + +using OpenTelemetry.Metrics; + +namespace KubeOps.Operator.Web.Builder; + +/// +/// Convenience extensions that subscribe an OpenTelemetry meter provider to the operator's metrics +/// and expose the Prometheus scraping endpoint via ASP.NET Core. The operator records its metrics on +/// a named after . +/// +public static class MetricsExtensions +{ + /// + /// Subscribes the meter provider to the operator's metrics. The operator name is resolved from + /// the registered , so AddKubernetesOperator() must have run + /// on the same service collection. Use + /// to pass the name explicitly. + /// + /// The meter provider builder. + /// The builder for chaining. + public static MeterProviderBuilder AddKubeOpsInstrumentation(this MeterProviderBuilder builder) + { + ArgumentNullException.ThrowIfNull(builder); + + if (builder is IDeferredMeterProviderBuilder deferred) + { + return deferred.Configure((sp, b) => + b.AddMeter(sp.GetRequiredService().Name)); + } + + return builder; + } + + /// + /// Subscribes the meter provider to the operator's metrics using an explicit operator name. + /// + /// The meter provider builder. + /// The operator name (must equal ). + /// The builder for chaining. + public static MeterProviderBuilder AddKubeOpsInstrumentation(this MeterProviderBuilder builder, string operatorName) + { + ArgumentNullException.ThrowIfNull(builder); + ArgumentException.ThrowIfNullOrWhiteSpace(operatorName); + + return builder.AddMeter(operatorName); + } + + /// + /// Maps the Prometheus scraping endpoint that exposes the operator's metrics. + /// + /// The endpoint route builder (e.g. the WebApplication). + /// The endpoint pattern. Defaults to /metrics. + /// The endpoint route builder for chaining. + public static IEndpointRouteBuilder MapOperatorMetricsEndpoint( + this IEndpointRouteBuilder endpoints, string pattern = "/metrics") + { + ArgumentNullException.ThrowIfNull(endpoints); + + endpoints.MapPrometheusScrapingEndpoint(pattern); + return endpoints; + } +} diff --git a/src/KubeOps.Operator.Web/KubeOps.Operator.Web.csproj b/src/KubeOps.Operator.Web/KubeOps.Operator.Web.csproj index b5ff6c8b..f95c5186 100644 --- a/src/KubeOps.Operator.Web/KubeOps.Operator.Web.csproj +++ b/src/KubeOps.Operator.Web/KubeOps.Operator.Web.csproj @@ -27,6 +27,9 @@ + + \ No newline at end of file diff --git a/src/KubeOps.Operator/Builder/OperatorBuilder.cs b/src/KubeOps.Operator/Builder/OperatorBuilder.cs index bb4e3755..b7db4aaa 100644 --- a/src/KubeOps.Operator/Builder/OperatorBuilder.cs +++ b/src/KubeOps.Operator/Builder/OperatorBuilder.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for more information. using System.Diagnostics; +using System.Diagnostics.Metrics; using k8s; using k8s.Models; @@ -20,6 +21,7 @@ using KubeOps.Operator.Events; using KubeOps.Operator.Finalizer; using KubeOps.Operator.LeaderElection; +using KubeOps.Operator.Metrics; using KubeOps.Operator.Queue; using KubeOps.Operator.Reconciliation; using KubeOps.Operator.Watcher; @@ -124,7 +126,13 @@ private void AddOperatorBase() Services.AddSingleton(Settings); Services.AddSingleton(new ActivitySource(Settings.Name)); - // add and configure resource watcher entity cache + if (Settings.EnableMetrics) + { + Services.AddMetrics(); + Services.AddSingleton(sp => + new OperatorMetrics(sp.GetRequiredService(), Settings.Name)); + } + Services.WithResourceWatcherEntityCaching(Settings); // Add the default configuration and the client separately. This allows external users to override either diff --git a/src/KubeOps.Operator/Metrics/MetricTagExtensions.cs b/src/KubeOps.Operator/Metrics/MetricTagExtensions.cs new file mode 100644 index 00000000..54c61816 --- /dev/null +++ b/src/KubeOps.Operator/Metrics/MetricTagExtensions.cs @@ -0,0 +1,40 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information. + +using k8s; + +using KubeOps.Abstractions.Reconciliation; + +namespace KubeOps.Operator.Metrics; + +/// +/// Maps reconciliation enums to stable, lower-case metric tag values. +/// +internal static class MetricTagExtensions +{ + public static string ToMetricString(this WatchEventType type) => type switch + { + WatchEventType.Added => "added", + WatchEventType.Modified => "modified", + WatchEventType.Deleted => "deleted", + WatchEventType.Bookmark => "bookmark", + WatchEventType.Error => "error", + _ => type.ToString().ToLowerInvariant(), + }; + + public static string ToMetricString(this ReconciliationTriggerSource source) => source switch + { + ReconciliationTriggerSource.ApiServer => "api_server", + ReconciliationTriggerSource.Operator => "operator", + _ => source.ToString().ToLowerInvariant(), + }; + + public static string ToMetricString(this ReconciliationType type) => type switch + { + ReconciliationType.Added => "added", + ReconciliationType.Modified => "modified", + ReconciliationType.Deleted => "deleted", + _ => type.ToString().ToLowerInvariant(), + }; +} diff --git a/src/KubeOps.Operator/Metrics/OperatorMetrics.cs b/src/KubeOps.Operator/Metrics/OperatorMetrics.cs new file mode 100644 index 00000000..440c84d3 --- /dev/null +++ b/src/KubeOps.Operator/Metrics/OperatorMetrics.cs @@ -0,0 +1,202 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information. + +using System.Collections.Concurrent; +using System.Diagnostics; +using System.Diagnostics.Metrics; + +namespace KubeOps.Operator.Metrics; + +/// +/// Owns the operator's OpenTelemetry instruments. A single instance is registered as a singleton +/// and shared across all entity types; the kubeops.entity.type tag distinguishes measurements per +/// watched resource. +/// +/// +/// +/// The underlying is named after the operator (see OperatorSettings.Name), +/// matching the name so a single identifier configures +/// both tracing and metrics. Recording is virtually free when no listener/exporter is attached. +/// +/// +/// The is created through and owned by the ; it is +/// therefore disposed by the factory (the DI container) and intentionally not disposed here. +/// +/// +public sealed class OperatorMetrics +{ + private const string EntityTypeTag = "kubeops.entity.type"; + + private readonly Counter _queueEnqueued; + private readonly Counter _queueRequeued; + private readonly Counter _queueDiscarded; + private readonly Counter _reconciliationTotal; + private readonly Histogram _reconciliationDuration; + private readonly Counter _watcherEvents; + private readonly Counter _watcherReconnections; + + // Depth providers per entity type, observed by a single shared gauge. Using one instrument for + // all entity types (rather than one per closed generic queue) avoids duplicate-instrument + // registration warnings from the OpenTelemetry SDK. + private readonly ConcurrentDictionary _queueDepthProviders = new(); + + /// + /// Initializes a new instance of the class. + /// + /// The factory used to create the underlying . + /// The meter name; should match the operator name. + public OperatorMetrics(IMeterFactory meterFactory, string meterName) + { + ArgumentNullException.ThrowIfNull(meterFactory); + + var meter = meterFactory.Create(new MeterOptions(meterName) + { + Version = typeof(OperatorMetrics).Assembly.GetName().Version?.ToString(), + }); + + meter.CreateObservableGauge( + "kubeops.operator.queue.depth", + ObserveQueueDepth, + "{items}", + "Current number of entities in the queue, split by scheduled and ready state."); + + _queueEnqueued = meter.CreateCounter( + "kubeops.operator.queue.enqueued", + "{items}", + "Total number of entities enqueued for reconciliation."); + _queueRequeued = meter.CreateCounter( + "kubeops.operator.queue.requeued", + "{items}", + "Total number of entities requeued (conflict, error-retry, or operator requeue)."); + _queueDiscarded = meter.CreateCounter( + "kubeops.operator.queue.discarded", + "{items}", + "Total number of reconciliation requests discarded due to a locking conflict."); + _reconciliationTotal = meter.CreateCounter( + "kubeops.operator.reconciliation", + "{reconciliations}", + "Total number of reconciliations executed."); + _reconciliationDuration = meter.CreateHistogram( + "kubeops.operator.reconciliation.duration", + "s", + "Duration of a single reconciliation, including the entity fetch.", + advice: new InstrumentAdvice + { + // Reconciliations are typically sub-second to a few seconds. The OpenTelemetry default + // buckets are sized for milliseconds, so override them with second-scale boundaries to + // get usable latency percentiles. + HistogramBucketBoundaries = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10, 30, 60], + }); + _watcherEvents = meter.CreateCounter( + "kubeops.operator.watcher.events", + "{events}", + "Total number of Kubernetes watch events received."); + _watcherReconnections = meter.CreateCounter( + "kubeops.operator.watcher.reconnections", + "{reconnections}", + "Total number of watcher reconnection attempts after an error."); + } + + /// Records that an entity was enqueued. + /// The watched entity type name. + /// The trigger source (api_server or operator). + public void RecordEnqueue(string entityType, string triggerSource) + => _queueEnqueued.Add( + 1, + new TagList { { EntityTypeTag, entityType }, { "kubeops.trigger.source", triggerSource } }); + + /// Records that an entity was requeued. + /// The watched entity type name. + /// The requeue reason (conflict, error_retry, or operator_requeue). + public void RecordRequeue(string entityType, string reason) + => _queueRequeued.Add( + 1, + new TagList { { EntityTypeTag, entityType }, { "kubeops.requeue.reason", reason } }); + + /// Records that a reconciliation request was discarded due to a locking conflict. + /// The watched entity type name. + public void RecordDiscard(string entityType) + => _queueDiscarded.Add(1, new TagList { { EntityTypeTag, entityType } }); + + /// Records a completed reconciliation and its duration. + /// The watched entity type name. + /// The reconciliation type (added, modified, or deleted). + /// The outcome (success or failure). + /// The reconciliation duration in seconds. + /// + /// For failed reconciliations, a low-cardinality classification of the error following the + /// OpenTelemetry error.type convention (typically the exception type's full name). + /// Ignored for successful reconciliations. + /// + public void RecordReconciliation( + string entityType, string reconciliationType, string status, double durationSeconds, string? errorType = null) + { + var tags = new TagList + { + { EntityTypeTag, entityType }, + { "kubeops.reconciliation.type", reconciliationType }, + { "kubeops.reconciliation.status", status }, + }; + + if (errorType is not null) + { + tags.Add("error.type", errorType); + } + + _reconciliationTotal.Add(1, tags); + _reconciliationDuration.Record(durationSeconds, tags); + } + + /// Records a received watch event. + /// The watched entity type name. + /// The watch event type (added, modified, deleted, or bookmark). + public void RecordWatchEvent(string entityType, string eventType) + => _watcherEvents.Add( + 1, + new TagList { { EntityTypeTag, entityType }, { "kubeops.watcher.event.type", eventType } }); + + /// Records a watcher reconnection attempt. + /// The watched entity type name. + public void RecordWatcherReconnection(string entityType) + => _watcherReconnections.Add(1, new TagList { { EntityTypeTag, entityType } }); + + /// + /// Registers a depth provider for the given entity type. All providers are observed by a single + /// shared kubeops.operator.queue.depth gauge that emits one measurement per entity type and + /// kubeops.queue.state (scheduled = delayed, not yet ready; ready = ready to reconcile). + /// + /// The watched entity type name. + /// A callback returning the number of scheduled (delayed) entries. + /// A callback returning the number of ready entries. + public void RegisterQueueDepthGauge(string entityType, Func scheduledDepth, Func readyDepth) + => _queueDepthProviders[entityType] = new QueueDepthProvider(scheduledDepth, readyDepth); + + private IEnumerable> ObserveQueueDepth() + { + foreach (var (entityType, provider) in _queueDepthProviders) + { + int scheduled, ready; + try + { + scheduled = provider.ScheduledDepth(); + ready = provider.ReadyDepth(); + } + catch (ObjectDisposedException) + { + // The queue backing this provider was torn down (e.g. during shutdown). Skip it so a + // single disposed queue does not abort the observation for all other entity types. + continue; + } + + yield return new Measurement( + scheduled, + new TagList { { EntityTypeTag, entityType }, { "kubeops.queue.state", "scheduled" } }); + yield return new Measurement( + ready, + new TagList { { EntityTypeTag, entityType }, { "kubeops.queue.state", "ready" } }); + } + } + + private sealed record QueueDepthProvider(Func ScheduledDepth, Func ReadyDepth); +} diff --git a/src/KubeOps.Operator/Queue/EntityQueueBackgroundService{TEntity}.cs b/src/KubeOps.Operator/Queue/EntityQueueBackgroundService{TEntity}.cs index ee22f2cf..5c1bc4ce 100644 --- a/src/KubeOps.Operator/Queue/EntityQueueBackgroundService{TEntity}.cs +++ b/src/KubeOps.Operator/Queue/EntityQueueBackgroundService{TEntity}.cs @@ -12,6 +12,7 @@ using KubeOps.Abstractions.Reconciliation; using KubeOps.KubernetesClient; using KubeOps.Operator.Logging; +using KubeOps.Operator.Metrics; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; @@ -65,7 +66,8 @@ internal sealed class EntityQueueBackgroundService( OperatorSettings operatorSettings, ITimedEntityQueue queue, IReconciler reconciler, - ILogger> logger) : IHostedService, IDisposable, IAsyncDisposable + ILogger> logger, + OperatorMetrics? metrics = null) : IHostedService, IDisposable, IAsyncDisposable where TEntity : IKubernetesObject { private readonly CancellationTokenSource _cts = new(); @@ -272,6 +274,7 @@ private async Task ProcessEntryAsync(QueueEntry entry, CancellationToke await uidEntry.Semaphore.WaitAsync(cancellationToken); } + var stopwatch = Stopwatch.StartNew(); try { logger @@ -281,6 +284,13 @@ private async Task ProcessEntryAsync(QueueEntry entry, CancellationToke var result = await ReconcileSingleAsync(entry, cancellationToken); + metrics?.RecordReconciliation( + typeof(TEntity).Name, + entry.ReconciliationType.ToMetricString(), + result.IsSuccess ? "success" : "failure", + stopwatch.Elapsed.TotalSeconds, + result.IsSuccess ? null : result.Error?.GetType().FullName ?? "_OTHER"); + logger .LogInformation( """Completed reconciliation for "{Identifier}" {State}.""", @@ -289,6 +299,16 @@ private async Task ProcessEntryAsync(QueueEntry entry, CancellationToke ? "successfully" : "with failures"); } + catch (Exception e) when (e is not OperationCanceledException || !cancellationToken.IsCancellationRequested) + { + metrics?.RecordReconciliation( + typeof(TEntity).Name, + entry.ReconciliationType.ToMetricString(), + "failure", + stopwatch.Elapsed.TotalSeconds, + e.GetType().FullName); + throw; + } finally { uidEntry.Semaphore.Release(); @@ -335,6 +355,8 @@ await queue.Enqueue( delay, nextRetryCount, CancellationToken.None); + + metrics?.RecordRequeue(typeof(TEntity).Name, "error_retry"); } else { @@ -365,6 +387,7 @@ private async Task HandleLockingConflictAsync(QueueEntry entry, Cancell .LogDebug( """Entity "{Identifier}" is already being reconciled. Discarding request.""", entry.Entity.ToIdentifierString()); + metrics?.RecordDiscard(typeof(TEntity).Name); break; case ParallelReconciliationConflictStrategy.RequeueAfterDelay: @@ -382,6 +405,8 @@ await queue.Enqueue( requeueDelay, retryCount: 0, cancellationToken); + + metrics?.RecordRequeue(typeof(TEntity).Name, "conflict"); break; default: diff --git a/src/KubeOps.Operator/Queue/TimedEntityQueue.cs b/src/KubeOps.Operator/Queue/TimedEntityQueue.cs index a1055819..b75b0a00 100644 --- a/src/KubeOps.Operator/Queue/TimedEntityQueue.cs +++ b/src/KubeOps.Operator/Queue/TimedEntityQueue.cs @@ -9,6 +9,7 @@ using KubeOps.Abstractions.Reconciliation; using KubeOps.Operator.Logging; +using KubeOps.Operator.Metrics; using Microsoft.Extensions.Logging; @@ -41,6 +42,7 @@ public sealed class TimedEntityQueue : ITimedEntityQueue private const int TimerIntervalMilliseconds = 100; private readonly ILogger> _logger; + private readonly OperatorMetrics? _metrics; // Used for managing all scheduled entries that should be added to the queue in the future. private readonly ConcurrentDictionary> _management = new(); @@ -57,12 +59,23 @@ public sealed class TimedEntityQueue : ITimedEntityQueue // Task that runs the timer loop. private readonly Task _timerTask; - private bool _disposed; + // Read by the meter's observation thread (queue-depth gauge) and written on the dispose thread, + // hence volatile to ensure the disposed state is observed promptly across threads. + private volatile bool _disposed; - public TimedEntityQueue(ILogger> logger) + public TimedEntityQueue(ILogger> logger, OperatorMetrics? metrics = null) { _logger = logger; + _metrics = metrics; _timerTask = Task.Run(ProcessScheduledEntriesAsync); + + // The gauge callbacks are invoked by the (long-lived) meter for its whole lifetime, which + // outlives this queue. After Dispose() the BlockingCollection would throw + // ObjectDisposedException on Count, so the callbacks short-circuit once disposed. + _metrics?.RegisterQueueDepthGauge( + typeof(TEntity).Name, + () => _disposed ? 0 : _management.Count, + () => _disposed ? 0 : _queue.Count); } internal int Count => _management.Count; @@ -119,6 +132,8 @@ public Task Enqueue(TEntity entity, ReconciliationType type, ReconciliationTrigg return new(entity, newReconciliationType, reconciliationTriggerSource, newQueueIn, retryCount); }); + _metrics?.RecordEnqueue(typeof(TEntity).Name, reconciliationTriggerSource.ToMetricString()); + return Task.CompletedTask; } diff --git a/src/KubeOps.Operator/Reconciliation/Reconciler.cs b/src/KubeOps.Operator/Reconciliation/Reconciler.cs index c1843a50..1e88949a 100644 --- a/src/KubeOps.Operator/Reconciliation/Reconciler.cs +++ b/src/KubeOps.Operator/Reconciliation/Reconciler.cs @@ -11,6 +11,7 @@ using KubeOps.Abstractions.Reconciliation.Finalizer; using KubeOps.KubernetesClient; using KubeOps.Operator.Logging; +using KubeOps.Operator.Metrics; using KubeOps.Operator.Queue; using Microsoft.Extensions.DependencyInjection; @@ -37,7 +38,8 @@ internal sealed class Reconciler( IServiceProvider serviceProvider, OperatorSettings operatorSettings, ITimedEntityQueue entityQueue, - IKubernetesClient client) + IKubernetesClient client, + OperatorMetrics? metrics = null) : IReconciler where TEntity : IKubernetesObject { @@ -69,6 +71,8 @@ await entityQueue queueIn: result.RequeueAfter.Value, retryCount: 0, cancellationToken); + + metrics?.RecordRequeue(typeof(TEntity).Name, "operator_requeue"); } return result; diff --git a/src/KubeOps.Operator/Watcher/LeaderAwareResourceWatcher{TEntity}.cs b/src/KubeOps.Operator/Watcher/LeaderAwareResourceWatcher{TEntity}.cs index f1101e39..72e9ca33 100644 --- a/src/KubeOps.Operator/Watcher/LeaderAwareResourceWatcher{TEntity}.cs +++ b/src/KubeOps.Operator/Watcher/LeaderAwareResourceWatcher{TEntity}.cs @@ -11,6 +11,7 @@ using KubeOps.Abstractions.Builder; using KubeOps.Abstractions.Entities; using KubeOps.KubernetesClient; +using KubeOps.Operator.Metrics; using KubeOps.Operator.Queue; using Microsoft.Extensions.Hosting; @@ -30,7 +31,8 @@ public class LeaderAwareResourceWatcher( IEntityFieldSelector fieldSelector, IKubernetesClient client, IHostApplicationLifetime hostApplicationLifetime, - LeaderElector elector) + LeaderElector elector, + OperatorMetrics? metrics = null) : ResourceWatcher( activitySource, logger, @@ -39,7 +41,8 @@ public class LeaderAwareResourceWatcher( settings, labelSelector, fieldSelector, - client) + client, + metrics) where TEntity : IKubernetesObject { private CancellationTokenSource _cts = new(); diff --git a/src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs b/src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs index 1f5a9c68..c9807b70 100644 --- a/src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs +++ b/src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs @@ -16,6 +16,7 @@ using KubeOps.KubernetesClient; using KubeOps.Operator.Constants; using KubeOps.Operator.Logging; +using KubeOps.Operator.Metrics; using KubeOps.Operator.Queue; using KubeOps.Operator.Reconciliation; using KubeOps.Operator.Retry; @@ -35,7 +36,8 @@ public class ResourceWatcher( OperatorSettings settings, IEntityLabelSelector labelSelector, IEntityFieldSelector fieldSelector, - IKubernetesClient client) + IKubernetesClient client, + OperatorMetrics? metrics = null) : IHostedService, IAsyncDisposable, IDisposable where TEntity : IKubernetesObject { @@ -299,6 +301,8 @@ private async Task WatchClientEventsAsync(CancellationToken stoppingToken) using var activity = activitySource.StartActivity($"""processing "{type}" event""", ActivityKind.Consumer); using var scope = logger.BeginScope(EntityLoggingScope.CreateFor(type, entity)); + metrics?.RecordWatchEvent(typeof(TEntity).Name, type.ToMetricString()); + logger .LogInformation( """Received watch event "{EventType}" for "{Identifier}", last observed resource version: {ResourceVersion}.""", @@ -379,6 +383,7 @@ e.InnerException is EndOfStreamException && logger.LogError(e, """There was an error while watching the resource "{Resource}".""", typeof(TEntity)); _watcherReconnectRetries++; + metrics?.RecordWatcherReconnection(typeof(TEntity).Name); var delay = ExponentialRetryBackoff.GetDelayWithJitter(_watcherReconnectRetries); logger.LogWarning( diff --git a/test/KubeOps.Operator.Test/Metrics/OperatorMetrics.Test.cs b/test/KubeOps.Operator.Test/Metrics/OperatorMetrics.Test.cs new file mode 100644 index 00000000..24de971c --- /dev/null +++ b/test/KubeOps.Operator.Test/Metrics/OperatorMetrics.Test.cs @@ -0,0 +1,249 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information. + +using System.Diagnostics.Metrics; + +using FluentAssertions; + +using KubeOps.Operator.Metrics; + +namespace KubeOps.Operator.Test.Metrics; + +[Trait("Area", "Otel")] +public sealed class OperatorMetricsTest +{ + private const string MeterName = "test-operator"; + + [Fact] + public void RecordEnqueue_increments_counter_with_tags() + { + using var harness = new MetricHarness(); + + harness.Metrics.RecordEnqueue("V1Secret", "api_server"); + + var measurement = harness.LongMeasurements.Should().ContainSingle().Subject; + measurement.Instrument.Should().Be("kubeops.operator.queue.enqueued"); + measurement.Value.Should().Be(1); + measurement.Tags.Should().Contain("kubeops.entity.type", "V1Secret"); + measurement.Tags.Should().Contain("kubeops.trigger.source", "api_server"); + } + + [Theory] + [InlineData("conflict")] + [InlineData("error_retry")] + [InlineData("operator_requeue")] + public void RecordRequeue_increments_counter_with_reason(string reason) + { + using var harness = new MetricHarness(); + + harness.Metrics.RecordRequeue("V1Secret", reason); + + var measurement = harness.LongMeasurements.Should().ContainSingle().Subject; + measurement.Instrument.Should().Be("kubeops.operator.queue.requeued"); + measurement.Tags.Should().Contain("kubeops.requeue.reason", reason); + } + + [Fact] + public void RecordDiscard_increments_counter() + { + using var harness = new MetricHarness(); + + harness.Metrics.RecordDiscard("V1Secret"); + + harness.LongMeasurements.Should().ContainSingle() + .Which.Instrument.Should().Be("kubeops.operator.queue.discarded"); + } + + [Fact] + public void RecordReconciliation_records_count_and_duration() + { + using var harness = new MetricHarness(); + + harness.Metrics.RecordReconciliation("V1Secret", "modified", "success", 1.5); + + harness.LongMeasurements.Should().ContainSingle() + .Which.Instrument.Should().Be("kubeops.operator.reconciliation"); + + var duration = harness.DoubleMeasurements.Should().ContainSingle().Subject; + duration.Instrument.Should().Be("kubeops.operator.reconciliation.duration"); + duration.Value.Should().Be(1.5); + duration.Tags.Should().Contain("kubeops.reconciliation.type", "modified"); + duration.Tags.Should().Contain("kubeops.reconciliation.status", "success"); + } + + [Fact] + public void RecordReconciliation_failure_adds_error_type_tag() + { + using var harness = new MetricHarness(); + + harness.Metrics.RecordReconciliation("V1Secret", "added", "failure", 0.2, "System.TimeoutException"); + + var measurement = harness.LongMeasurements.Should().ContainSingle().Subject; + measurement.Tags.Should().Contain("kubeops.reconciliation.status", "failure"); + measurement.Tags.Should().Contain("error.type", "System.TimeoutException"); + } + + [Fact] + public void RecordReconciliation_success_omits_error_type_tag() + { + using var harness = new MetricHarness(); + + harness.Metrics.RecordReconciliation("V1Secret", "added", "success", 0.2); + + harness.LongMeasurements.Should().ContainSingle() + .Which.Tags.Should().NotContainKey("error.type"); + } + + [Fact] + public void QueueDepthGauge_reports_multiple_entity_types_on_single_instrument() + { + using var harness = new MetricHarness(); + + harness.Metrics.RegisterQueueDepthGauge("V1Secret", () => 1, () => 2); + harness.Metrics.RegisterQueueDepthGauge("V1ConfigMap", () => 3, () => 4); + harness.Listener.RecordObservableInstruments(); + + var depth = harness.IntMeasurements + .Where(m => m.Instrument == "kubeops.operator.queue.depth") + .ToList(); + + // 2 entity types x 2 states, all on the single shared instrument. + depth.Should().HaveCount(4); + depth.Select(m => (string?)m.Tags["kubeops.entity.type"]).Distinct().Should() + .BeEquivalentTo("V1Secret", "V1ConfigMap"); + } + + [Fact] + public void QueueDepthGauge_skips_disposed_provider_without_breaking_others() + { + using var harness = new MetricHarness(); + + harness.Metrics.RegisterQueueDepthGauge( + "V1Disposed", + () => throw new ObjectDisposedException("queue"), + () => throw new ObjectDisposedException("queue")); + harness.Metrics.RegisterQueueDepthGauge("V1Healthy", () => 7, () => 8); + harness.Listener.RecordObservableInstruments(); + + var depth = harness.IntMeasurements + .Where(m => m.Instrument == "kubeops.operator.queue.depth") + .ToList(); + + // Only the healthy provider's two measurements are reported; the disposed one is skipped. + depth.Should().HaveCount(2); + depth.Should().OnlyContain(m => (string?)m.Tags["kubeops.entity.type"] == "V1Healthy"); + } + + [Fact] + public void RecordWatchEvent_and_reconnection_increment_counters() + { + using var harness = new MetricHarness(); + + harness.Metrics.RecordWatchEvent("V1Secret", "Added"); + harness.Metrics.RecordWatcherReconnection("V1Secret"); + + harness.LongMeasurements.Select(m => m.Instrument).Should() + .BeEquivalentTo("kubeops.operator.watcher.events", "kubeops.operator.watcher.reconnections"); + } + + [Fact] + public void QueueDepthGauge_reports_scheduled_and_ready() + { + using var harness = new MetricHarness(); + + harness.Metrics.RegisterQueueDepthGauge("V1Secret", () => 3, () => 5); + harness.Listener.RecordObservableInstruments(); + + var depthMeasurements = harness.IntMeasurements + .Where(m => m.Instrument == "kubeops.operator.queue.depth") + .ToList(); + + depthMeasurements.Should().HaveCount(2); + depthMeasurements.Should().ContainSingle(m => + m.Value == 3 && (string?)m.Tags["kubeops.queue.state"] == "scheduled"); + depthMeasurements.Should().ContainSingle(m => + m.Value == 5 && (string?)m.Tags["kubeops.queue.state"] == "ready"); + } + + private sealed record CapturedMeasurement(string Instrument, T Value, IReadOnlyDictionary Tags); + + private sealed class TestMeterFactory : IMeterFactory + { + private readonly List _meters = []; + + public Meter Create(MeterOptions options) + { + var meter = new Meter(options); + _meters.Add(meter); + return meter; + } + + public void Dispose() + { + foreach (var meter in _meters) + { + meter.Dispose(); + } + + _meters.Clear(); + } + } + + private sealed class MetricHarness : IDisposable + { + private readonly TestMeterFactory _factory = new(); + + public MetricHarness() + { + Listener = new MeterListener + { + InstrumentPublished = (instrument, listener) => + { + if (instrument.Meter.Name == MeterName) + { + listener.EnableMeasurementEvents(instrument); + } + }, + }; + + Listener.SetMeasurementEventCallback((instrument, value, tags, _) => + LongMeasurements.Add(new(instrument.Name, value, ToDictionary(tags)))); + Listener.SetMeasurementEventCallback((instrument, value, tags, _) => + DoubleMeasurements.Add(new(instrument.Name, value, ToDictionary(tags)))); + Listener.SetMeasurementEventCallback((instrument, value, tags, _) => + IntMeasurements.Add(new(instrument.Name, value, ToDictionary(tags)))); + + Listener.Start(); + + Metrics = new OperatorMetrics(_factory, MeterName); + } + + public OperatorMetrics Metrics { get; } + + public MeterListener Listener { get; } + + public List> LongMeasurements { get; } = []; + + public List> DoubleMeasurements { get; } = []; + + public List> IntMeasurements { get; } = []; + + public void Dispose() + { + Listener.Dispose(); + _factory.Dispose(); + } + + private static IReadOnlyDictionary ToDictionary(ReadOnlySpan> tags) + { + var dictionary = new Dictionary(tags.Length); + foreach (var tag in tags) + { + dictionary[tag.Key] = tag.Value; + } + + return dictionary; + } + } +} diff --git a/test/KubeOps.Operator.Test/Queue/EntityQueueBackgroundService.Test.cs b/test/KubeOps.Operator.Test/Queue/EntityQueueBackgroundService.Test.cs index 9ff53286..384d344f 100644 --- a/test/KubeOps.Operator.Test/Queue/EntityQueueBackgroundService.Test.cs +++ b/test/KubeOps.Operator.Test/Queue/EntityQueueBackgroundService.Test.cs @@ -2,6 +2,8 @@ // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. +using System.Diagnostics.Metrics; + using FluentAssertions; using k8s.Models; @@ -9,8 +11,10 @@ using KubeOps.Abstractions.Builder; using KubeOps.Abstractions.Reconciliation; using KubeOps.KubernetesClient; +using KubeOps.Operator.Metrics; using KubeOps.Operator.Queue; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Moq; @@ -77,7 +81,8 @@ private static EntityQueueBackgroundService CreateService( Mock> reconcilerMock, Mock clientMock, V1ConfigMap? entity, - OperatorSettings? settings = null) + OperatorSettings? settings = null, + OperatorMetrics? metrics = null) { var effectiveSettings = settings ?? new OperatorSettingsBuilder().Build(); @@ -94,7 +99,84 @@ private static EntityQueueBackgroundService CreateService( effectiveSettings, queue, reconcilerMock.Object, - Mock.Of>>()); + Mock.Of>>(), + metrics); + } + + [Trait("Area", "Otel")] + [Fact] + public async Task Throwing_Reconciler_Records_Failure_Reconciliation_Metric() + { + const string meterName = "test-failure-metrics"; + using var meterFactory = new ServiceCollection().AddMetrics().BuildServiceProvider() + .GetRequiredService(); + var metrics = new OperatorMetrics(meterFactory, meterName); + + var captured = new List<(string Status, string? ErrorType)>(); + using var listener = new MeterListener + { + InstrumentPublished = (instrument, l) => + { + if (instrument.Meter.Name == meterName && instrument.Name == "kubeops.operator.reconciliation") + { + l.EnableMeasurementEvents(instrument); + } + }, + }; + listener.SetMeasurementEventCallback((_, _, tags, _) => + { + string? status = null; + string? errorType = null; + foreach (var tag in tags) + { + if (tag.Key == "kubeops.reconciliation.status") + { + status = tag.Value as string; + } + else if (tag.Key == "error.type") + { + errorType = tag.Value as string; + } + } + + lock (captured) + { + captured.Add((status!, errorType)); + } + }); + listener.Start(); + + var queue = new ControllableQueue(); + var reconcilerMock = new Mock>(); + var clientMock = new Mock(); + var entity = CreateEntity(); + + reconcilerMock + .Setup(r => r.Reconcile( + It.IsAny>(), + It.IsAny())) + .ThrowsAsync(new InvalidOperationException("boom")); + + // No retries so the failure path resolves deterministically to a single dropped attempt. + var settings = new OperatorSettingsBuilder() + .WithParallelReconciliation(p => p.MaxErrorRetries = 0) + .Build(); + + await using var service = CreateService(queue, reconcilerMock, clientMock, entity, settings, metrics); + await service.StartAsync(TestContext.Current.CancellationToken); + + queue.Push(entity, ReconciliationType.Added, ReconciliationTriggerSource.ApiServer); + queue.Complete(); + + await Task.Delay(300, TestContext.Current.CancellationToken); + await service.StopAsync(TestContext.Current.CancellationToken); + + lock (captured) + { + captured.Should().ContainSingle(); + captured[0].Status.Should().Be("failure"); + captured[0].ErrorType.Should().Be("System.InvalidOperationException"); + } } [Fact] diff --git a/test/KubeOps.Operator.Web.Test/KubeOps.Operator.Web.Test.csproj b/test/KubeOps.Operator.Web.Test/KubeOps.Operator.Web.Test.csproj index 404af4c5..0f5245de 100644 --- a/test/KubeOps.Operator.Web.Test/KubeOps.Operator.Web.Test.csproj +++ b/test/KubeOps.Operator.Web.Test/KubeOps.Operator.Web.Test.csproj @@ -9,6 +9,8 @@ + + diff --git a/test/KubeOps.Operator.Web.Test/Metrics/MetricsExtensions.Test.cs b/test/KubeOps.Operator.Web.Test/Metrics/MetricsExtensions.Test.cs new file mode 100644 index 00000000..97ee02c8 --- /dev/null +++ b/test/KubeOps.Operator.Web.Test/Metrics/MetricsExtensions.Test.cs @@ -0,0 +1,66 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information. + +using System.Diagnostics.Metrics; + +using FluentAssertions; + +using KubeOps.Abstractions.Builder; +using KubeOps.Operator.Web.Builder; + +using Microsoft.Extensions.DependencyInjection; + +using OpenTelemetry; +using OpenTelemetry.Metrics; + +namespace KubeOps.Operator.Web.Test.Metrics; + +[Trait("Area", "Otel")] +public sealed class MetricsExtensionsTest +{ + private const string OperatorName = "test-operator"; + + [Fact] + public void Should_Subscribe_Meter_Resolved_From_Settings() + { + var exported = new List(); + var services = new ServiceCollection(); + services.AddSingleton(new OperatorSettingsBuilder { Name = OperatorName }.Build()); + services.AddOpenTelemetry() + .WithMetrics(metrics => metrics + .AddKubeOpsInstrumentation() + .AddInMemoryExporter(exported)); + + using var provider = services.BuildServiceProvider(); + var meterProvider = provider.GetRequiredService(); + + using var meter = new Meter(OperatorName); + meter.CreateCounter("test.counter").Add(1); + + meterProvider.ForceFlush(); + + exported.Should().Contain(m => m.Name == "test.counter"); + } + + [Fact] + public void Should_Subscribe_Meter_By_Explicit_Name() + { + var exported = new List(); + var services = new ServiceCollection(); + services.AddOpenTelemetry() + .WithMetrics(metrics => metrics + .AddKubeOpsInstrumentation(OperatorName) + .AddInMemoryExporter(exported)); + + using var provider = services.BuildServiceProvider(); + var meterProvider = provider.GetRequiredService(); + + using var meter = new Meter(OperatorName); + meter.CreateCounter("test.counter").Add(1); + + meterProvider.ForceFlush(); + + exported.Should().Contain(m => m.Name == "test.counter"); + } +}