From 331f24422acc34d918e4a3b47c695926f2d0addc Mon Sep 17 00:00:00 2001 From: ermakov-oleg Date: Fri, 13 Feb 2026 15:36:08 +0100 Subject: [PATCH] feat: add Prometheus metrics for backup/recovery window Signed-off-by: ermakov-oleg --- internal/cnpgi/instance/identity.go | 7 + internal/cnpgi/instance/metrics.go | 168 ++++++++++++++++++++++++ internal/cnpgi/instance/metrics_test.go | 117 +++++++++++++++++ internal/cnpgi/instance/start.go | 4 + internal/cnpgi/instance/suite_test.go | 13 ++ 5 files changed, 309 insertions(+) create mode 100644 internal/cnpgi/instance/metrics.go create mode 100644 internal/cnpgi/instance/metrics_test.go create mode 100644 internal/cnpgi/instance/suite_test.go diff --git a/internal/cnpgi/instance/identity.go b/internal/cnpgi/instance/identity.go index 6ecaf0ed..4d63a882 100644 --- a/internal/cnpgi/instance/identity.go +++ b/internal/cnpgi/instance/identity.go @@ -61,6 +61,13 @@ func (i IdentityImplementation) GetPluginCapabilities( }, }, }, + { + Type: &identity.PluginCapability_Service_{ + Service: &identity.PluginCapability_Service{ + Type: identity.PluginCapability_Service_TYPE_METRICS, + }, + }, + }, }, }, nil } diff --git a/internal/cnpgi/instance/metrics.go b/internal/cnpgi/instance/metrics.go new file mode 100644 index 00000000..2044e423 --- /dev/null +++ b/internal/cnpgi/instance/metrics.go @@ -0,0 +1,168 @@ +/* +Copyright The CloudNativePG Contributors +Copyright 2025, Opera Norway AS + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package instance + +import ( + "context" + "errors" + "fmt" + + "github.com/cloudnative-pg/cnpg-i/pkg/metrics" + "github.com/cloudnative-pg/machinery/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/client" + + pgbackrestv1 "github.com/operasoftware/cnpg-plugin-pgbackrest/api/v1" + "github.com/operasoftware/cnpg-plugin-pgbackrest/internal/cnpgi/operator/config" + "github.com/operasoftware/cnpg-plugin-pgbackrest/internal/pgbackrest/catalog" + pgbackrestCommand "github.com/operasoftware/cnpg-plugin-pgbackrest/internal/pgbackrest/command" + pgbackrestCredentials "github.com/operasoftware/cnpg-plugin-pgbackrest/internal/pgbackrest/credentials" + "github.com/operasoftware/cnpg-plugin-pgbackrest/internal/pgbackrest/utils" +) + +const ( + firstRecoverabilityPointMetric = "cnpg_pgbackrest_first_recoverability_point" + lastAvailableBackupTimestampMetric = "cnpg_pgbackrest_last_available_backup_timestamp" +) + +// MetricsServiceImplementation is the implementation of the Metrics Service +type MetricsServiceImplementation struct { + metrics.UnimplementedMetricsServer + Client client.Client +} + +// GetCapabilities implements the MetricsServer interface +func (m MetricsServiceImplementation) GetCapabilities( + _ context.Context, + _ *metrics.MetricsCapabilitiesRequest, +) (*metrics.MetricsCapabilitiesResult, error) { + return &metrics.MetricsCapabilitiesResult{ + Capabilities: []*metrics.MetricsCapability{ + { + Type: &metrics.MetricsCapability_Rpc{ + Rpc: &metrics.MetricsCapability_RPC{ + Type: metrics.MetricsCapability_RPC_TYPE_METRICS, + }, + }, + }, + }, + }, nil +} + +// Define implements the MetricsServer interface +func (m MetricsServiceImplementation) Define( + _ context.Context, + _ *metrics.DefineMetricsRequest, +) (*metrics.DefineMetricsResult, error) { + return &metrics.DefineMetricsResult{ + Metrics: []*metrics.Metric{ + { + FqName: firstRecoverabilityPointMetric, + Help: "The first point of recoverability for pgBackRest as a unix timestamp", + ValueType: &metrics.MetricType{ + Type: metrics.MetricType_TYPE_GAUGE, + }, + }, + { + FqName: lastAvailableBackupTimestampMetric, + Help: "The last available backup timestamp for pgBackRest as a unix timestamp", + ValueType: &metrics.MetricType{ + Type: metrics.MetricType_TYPE_GAUGE, + }, + }, + }, + }, nil +} + +// Collect implements the MetricsServer interface +func (m MetricsServiceImplementation) Collect( + ctx context.Context, + request *metrics.CollectMetricsRequest, +) (*metrics.CollectMetricsResult, error) { + contextLogger := log.FromContext(ctx) + + configuration, err := config.NewFromClusterJSON(request.ClusterDefinition) + if err != nil { + return nil, fmt.Errorf("while parsing cluster definition: %w", err) + } + + if configuration.PgbackrestObjectName == "" { + contextLogger.Debug("No pgbackrest archive configured, skipping metrics collection") + return &metrics.CollectMetricsResult{}, nil + } + + var archive pgbackrestv1.Archive + if err := m.Client.Get(ctx, configuration.GetArchiveObjectKey(), &archive); err != nil { + return nil, fmt.Errorf("while getting archive object: %w", err) + } + + env, err := pgbackrestCredentials.EnvSetBackupCloudCredentials( + ctx, + m.Client, + archive.Namespace, + &archive.Spec.Configuration, + utils.SanitizedEnviron()) + if err != nil { + return nil, fmt.Errorf("while getting credentials: %w", err) + } + + backupCatalog, err := pgbackrestCommand.GetBackupList(ctx, &archive.Spec.Configuration, configuration.Stanza, env) + if err != nil { + contextLogger.Error(err, "while getting backup list for metrics") + return &metrics.CollectMetricsResult{ + Metrics: []*metrics.CollectMetric{ + {FqName: firstRecoverabilityPointMetric, Value: 0}, + {FqName: lastAvailableBackupTimestampMetric, Value: 0}, + }, + }, nil + } + + result := &metrics.CollectMetricsResult{} + + firstRecoverability, lastBackup, err := getRecoveryWindow(backupCatalog) + if err != nil { + contextLogger.Debug("No backup data available for metrics", "error", err) + result.Metrics = append(result.Metrics, + &metrics.CollectMetric{FqName: firstRecoverabilityPointMetric, Value: 0}, + &metrics.CollectMetric{FqName: lastAvailableBackupTimestampMetric, Value: 0}, + ) + } else { + result.Metrics = append(result.Metrics, + &metrics.CollectMetric{FqName: firstRecoverabilityPointMetric, Value: float64(firstRecoverability)}, + &metrics.CollectMetric{FqName: lastAvailableBackupTimestampMetric, Value: float64(lastBackup)}, + ) + } + + return result, nil +} + +// getRecoveryWindow extracts first recoverability point and last backup timestamp +// from the backup catalog. Returns unix timestamps. +func getRecoveryWindow(backupCatalog *catalog.Catalog) (firstRecoverability, lastBackup int64, err error) { + if backupCatalog == nil || len(backupCatalog.Backups) == 0 { + return 0, 0, errors.New("no backups found") + } + + firstPoint := backupCatalog.FirstRecoverabilityPoint() + lastPoint := backupCatalog.GetLastSuccessfulBackupTime() + + if firstPoint == nil || lastPoint == nil { + return 0, 0, errors.New("no successful backups found") + } + + return firstPoint.Unix(), lastPoint.Unix(), nil +} diff --git a/internal/cnpgi/instance/metrics_test.go b/internal/cnpgi/instance/metrics_test.go new file mode 100644 index 00000000..f78126de --- /dev/null +++ b/internal/cnpgi/instance/metrics_test.go @@ -0,0 +1,117 @@ +package instance + +import ( + "github.com/operasoftware/cnpg-plugin-pgbackrest/internal/pgbackrest/catalog" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("getRecoveryWindow", func() { + It("returns error for nil catalog", func() { + _, _, err := getRecoveryWindow(nil) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(Equal("no backups found")) + }) + + It("returns error for empty catalog", func() { + c := &catalog.Catalog{} + _, _, err := getRecoveryWindow(c) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(Equal("no backups found")) + }) + + It("returns correct timestamps for a single completed backup", func() { + c := &catalog.Catalog{ + Backups: []catalog.PgbackrestBackup{ + { + Time: catalog.PgbackrestBackupTime{Start: 1000, Stop: 2000}, + WAL: catalog.PgbackrestBackupWALArchive{Start: "000000010000000000000001", Stop: "000000010000000000000002"}, + }, + }, + } + first, last, err := getRecoveryWindow(c) + Expect(err).NotTo(HaveOccurred()) + Expect(first).To(Equal(int64(2000))) + Expect(last).To(Equal(int64(2000))) + }) + + It("returns first recoverability from first backup and last from last backup", func() { + c := &catalog.Catalog{ + Backups: []catalog.PgbackrestBackup{ + { + Time: catalog.PgbackrestBackupTime{Start: 1000, Stop: 2000}, + WAL: catalog.PgbackrestBackupWALArchive{Start: "000000010000000000000001", Stop: "000000010000000000000002"}, + }, + { + Time: catalog.PgbackrestBackupTime{Start: 3000, Stop: 4000}, + WAL: catalog.PgbackrestBackupWALArchive{Start: "000000010000000000000003", Stop: "000000010000000000000004"}, + }, + { + Time: catalog.PgbackrestBackupTime{Start: 5000, Stop: 6000}, + WAL: catalog.PgbackrestBackupWALArchive{Start: "000000010000000000000005", Stop: "000000010000000000000006"}, + }, + }, + } + first, last, err := getRecoveryWindow(c) + Expect(err).NotTo(HaveOccurred()) + Expect(first).To(Equal(int64(2000))) + Expect(last).To(Equal(int64(6000))) + }) + + It("skips errored backups with Start=0", func() { + c := &catalog.Catalog{ + Backups: []catalog.PgbackrestBackup{ + { + Time: catalog.PgbackrestBackupTime{Start: 0, Stop: 500}, + WAL: catalog.PgbackrestBackupWALArchive{Start: "000000010000000000000001", Stop: "000000010000000000000002"}, + }, + { + Time: catalog.PgbackrestBackupTime{Start: 1000, Stop: 2000}, + WAL: catalog.PgbackrestBackupWALArchive{Start: "000000010000000000000003", Stop: "000000010000000000000004"}, + }, + }, + } + first, last, err := getRecoveryWindow(c) + Expect(err).NotTo(HaveOccurred()) + Expect(first).To(Equal(int64(2000))) + Expect(last).To(Equal(int64(2000))) + }) + + It("skips errored backups with Stop=0", func() { + c := &catalog.Catalog{ + Backups: []catalog.PgbackrestBackup{ + { + Time: catalog.PgbackrestBackupTime{Start: 1000, Stop: 0}, + WAL: catalog.PgbackrestBackupWALArchive{Start: "000000010000000000000001", Stop: "000000010000000000000002"}, + }, + { + Time: catalog.PgbackrestBackupTime{Start: 3000, Stop: 4000}, + WAL: catalog.PgbackrestBackupWALArchive{Start: "000000010000000000000003", Stop: "000000010000000000000004"}, + }, + }, + } + first, last, err := getRecoveryWindow(c) + Expect(err).NotTo(HaveOccurred()) + Expect(first).To(Equal(int64(4000))) + Expect(last).To(Equal(int64(4000))) + }) + + It("returns error when all backups are errored", func() { + c := &catalog.Catalog{ + Backups: []catalog.PgbackrestBackup{ + { + Time: catalog.PgbackrestBackupTime{Start: 0, Stop: 0}, + WAL: catalog.PgbackrestBackupWALArchive{Start: "000000010000000000000001", Stop: "000000010000000000000002"}, + }, + { + Time: catalog.PgbackrestBackupTime{Start: 1000, Stop: 0}, + WAL: catalog.PgbackrestBackupWALArchive{Start: "000000010000000000000003", Stop: "000000010000000000000004"}, + }, + }, + } + _, _, err := getRecoveryWindow(c) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(Equal("no successful backups found")) + }) +}) diff --git a/internal/cnpgi/instance/start.go b/internal/cnpgi/instance/start.go index 6ed820b7..3aa13112 100644 --- a/internal/cnpgi/instance/start.go +++ b/internal/cnpgi/instance/start.go @@ -22,6 +22,7 @@ import ( "github.com/cloudnative-pg/cnpg-i-machinery/pkg/pluginhelper/http" "github.com/cloudnative-pg/cnpg-i/pkg/backup" + "github.com/cloudnative-pg/cnpg-i/pkg/metrics" "github.com/cloudnative-pg/cnpg-i/pkg/wal" "google.golang.org/grpc" "sigs.k8s.io/controller-runtime/pkg/client" @@ -55,6 +56,9 @@ func (c *CNPGI) Start(ctx context.Context) error { InstanceName: c.InstanceName, PGDataPath: c.PGDataPath, }) + metrics.RegisterMetricsServer(server, MetricsServiceImplementation{ + Client: c.Client, + }) common.AddHealthCheck(server) return nil } diff --git a/internal/cnpgi/instance/suite_test.go b/internal/cnpgi/instance/suite_test.go new file mode 100644 index 00000000..09cf7bf3 --- /dev/null +++ b/internal/cnpgi/instance/suite_test.go @@ -0,0 +1,13 @@ +package instance + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestInstance(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Instance Suite") +}