Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions internal/cnpgi/instance/identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ func (i IdentityImplementation) GetPluginCapabilities(
},
},
},
{
Type: &identity.PluginCapability_Service_{
Service: &identity.PluginCapability_Service{
Type: identity.PluginCapability_Service_TYPE_METRICS,
},
},
},
},
}, nil
}
Expand Down
168 changes: 168 additions & 0 deletions internal/cnpgi/instance/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
Copyright The CloudNativePG Contributors
Copyright 2025, Opera Norway AS

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package instance

import (
"context"
"errors"
"fmt"

"github.com/cloudnative-pg/cnpg-i/pkg/metrics"
"github.com/cloudnative-pg/machinery/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/client"

pgbackrestv1 "github.com/operasoftware/cnpg-plugin-pgbackrest/api/v1"
"github.com/operasoftware/cnpg-plugin-pgbackrest/internal/cnpgi/operator/config"
"github.com/operasoftware/cnpg-plugin-pgbackrest/internal/pgbackrest/catalog"
pgbackrestCommand "github.com/operasoftware/cnpg-plugin-pgbackrest/internal/pgbackrest/command"
pgbackrestCredentials "github.com/operasoftware/cnpg-plugin-pgbackrest/internal/pgbackrest/credentials"
"github.com/operasoftware/cnpg-plugin-pgbackrest/internal/pgbackrest/utils"
)

const (
firstRecoverabilityPointMetric = "cnpg_pgbackrest_first_recoverability_point"
lastAvailableBackupTimestampMetric = "cnpg_pgbackrest_last_available_backup_timestamp"
)

// MetricsServiceImplementation is the implementation of the Metrics Service
type MetricsServiceImplementation struct {
metrics.UnimplementedMetricsServer
Client client.Client
}

// GetCapabilities implements the MetricsServer interface
func (m MetricsServiceImplementation) GetCapabilities(
_ context.Context,
_ *metrics.MetricsCapabilitiesRequest,
) (*metrics.MetricsCapabilitiesResult, error) {
return &metrics.MetricsCapabilitiesResult{
Capabilities: []*metrics.MetricsCapability{
{
Type: &metrics.MetricsCapability_Rpc{
Rpc: &metrics.MetricsCapability_RPC{
Type: metrics.MetricsCapability_RPC_TYPE_METRICS,
},
},
},
},
}, nil
}

// Define implements the MetricsServer interface
func (m MetricsServiceImplementation) Define(
_ context.Context,
_ *metrics.DefineMetricsRequest,
) (*metrics.DefineMetricsResult, error) {
return &metrics.DefineMetricsResult{
Metrics: []*metrics.Metric{
{
FqName: firstRecoverabilityPointMetric,
Help: "The first point of recoverability for pgBackRest as a unix timestamp",
ValueType: &metrics.MetricType{
Type: metrics.MetricType_TYPE_GAUGE,
},
},
{
FqName: lastAvailableBackupTimestampMetric,
Help: "The last available backup timestamp for pgBackRest as a unix timestamp",
ValueType: &metrics.MetricType{
Type: metrics.MetricType_TYPE_GAUGE,
},
},
},
}, nil
}

// Collect implements the MetricsServer interface
func (m MetricsServiceImplementation) Collect(
ctx context.Context,
request *metrics.CollectMetricsRequest,
) (*metrics.CollectMetricsResult, error) {
contextLogger := log.FromContext(ctx)

configuration, err := config.NewFromClusterJSON(request.ClusterDefinition)
if err != nil {
return nil, fmt.Errorf("while parsing cluster definition: %w", err)
}

if configuration.PgbackrestObjectName == "" {
contextLogger.Debug("No pgbackrest archive configured, skipping metrics collection")
return &metrics.CollectMetricsResult{}, nil
}

var archive pgbackrestv1.Archive
if err := m.Client.Get(ctx, configuration.GetArchiveObjectKey(), &archive); err != nil {
return nil, fmt.Errorf("while getting archive object: %w", err)
}

env, err := pgbackrestCredentials.EnvSetBackupCloudCredentials(
ctx,
m.Client,
archive.Namespace,
&archive.Spec.Configuration,
utils.SanitizedEnviron())
if err != nil {
return nil, fmt.Errorf("while getting credentials: %w", err)
}

backupCatalog, err := pgbackrestCommand.GetBackupList(ctx, &archive.Spec.Configuration, configuration.Stanza, env)
if err != nil {
contextLogger.Error(err, "while getting backup list for metrics")
return &metrics.CollectMetricsResult{
Metrics: []*metrics.CollectMetric{
{FqName: firstRecoverabilityPointMetric, Value: 0},
{FqName: lastAvailableBackupTimestampMetric, Value: 0},
},
}, nil
}

result := &metrics.CollectMetricsResult{}

firstRecoverability, lastBackup, err := getRecoveryWindow(backupCatalog)
if err != nil {
contextLogger.Debug("No backup data available for metrics", "error", err)
result.Metrics = append(result.Metrics,
&metrics.CollectMetric{FqName: firstRecoverabilityPointMetric, Value: 0},
&metrics.CollectMetric{FqName: lastAvailableBackupTimestampMetric, Value: 0},
)
} else {
result.Metrics = append(result.Metrics,
&metrics.CollectMetric{FqName: firstRecoverabilityPointMetric, Value: float64(firstRecoverability)},
&metrics.CollectMetric{FqName: lastAvailableBackupTimestampMetric, Value: float64(lastBackup)},
)
}

return result, nil
}

// getRecoveryWindow extracts first recoverability point and last backup timestamp
// from the backup catalog. Returns unix timestamps.
func getRecoveryWindow(backupCatalog *catalog.Catalog) (firstRecoverability, lastBackup int64, err error) {
if backupCatalog == nil || len(backupCatalog.Backups) == 0 {
return 0, 0, errors.New("no backups found")
}

firstPoint := backupCatalog.FirstRecoverabilityPoint()
lastPoint := backupCatalog.GetLastSuccessfulBackupTime()

if firstPoint == nil || lastPoint == nil {
return 0, 0, errors.New("no successful backups found")
}

return firstPoint.Unix(), lastPoint.Unix(), nil
}
117 changes: 117 additions & 0 deletions internal/cnpgi/instance/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package instance

import (
"github.com/operasoftware/cnpg-plugin-pgbackrest/internal/pgbackrest/catalog"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

var _ = Describe("getRecoveryWindow", func() {
It("returns error for nil catalog", func() {
_, _, err := getRecoveryWindow(nil)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal("no backups found"))
})

It("returns error for empty catalog", func() {
c := &catalog.Catalog{}
_, _, err := getRecoveryWindow(c)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal("no backups found"))
})

It("returns correct timestamps for a single completed backup", func() {
c := &catalog.Catalog{
Backups: []catalog.PgbackrestBackup{
{
Time: catalog.PgbackrestBackupTime{Start: 1000, Stop: 2000},
WAL: catalog.PgbackrestBackupWALArchive{Start: "000000010000000000000001", Stop: "000000010000000000000002"},
},
},
}
first, last, err := getRecoveryWindow(c)
Expect(err).NotTo(HaveOccurred())
Expect(first).To(Equal(int64(2000)))
Expect(last).To(Equal(int64(2000)))
})

It("returns first recoverability from first backup and last from last backup", func() {
c := &catalog.Catalog{
Backups: []catalog.PgbackrestBackup{
{
Time: catalog.PgbackrestBackupTime{Start: 1000, Stop: 2000},
WAL: catalog.PgbackrestBackupWALArchive{Start: "000000010000000000000001", Stop: "000000010000000000000002"},
},
{
Time: catalog.PgbackrestBackupTime{Start: 3000, Stop: 4000},
WAL: catalog.PgbackrestBackupWALArchive{Start: "000000010000000000000003", Stop: "000000010000000000000004"},
},
{
Time: catalog.PgbackrestBackupTime{Start: 5000, Stop: 6000},
WAL: catalog.PgbackrestBackupWALArchive{Start: "000000010000000000000005", Stop: "000000010000000000000006"},
},
},
}
first, last, err := getRecoveryWindow(c)
Expect(err).NotTo(HaveOccurred())
Expect(first).To(Equal(int64(2000)))
Expect(last).To(Equal(int64(6000)))
})

It("skips errored backups with Start=0", func() {
c := &catalog.Catalog{
Backups: []catalog.PgbackrestBackup{
{
Time: catalog.PgbackrestBackupTime{Start: 0, Stop: 500},
WAL: catalog.PgbackrestBackupWALArchive{Start: "000000010000000000000001", Stop: "000000010000000000000002"},
},
{
Time: catalog.PgbackrestBackupTime{Start: 1000, Stop: 2000},
WAL: catalog.PgbackrestBackupWALArchive{Start: "000000010000000000000003", Stop: "000000010000000000000004"},
},
},
}
first, last, err := getRecoveryWindow(c)
Expect(err).NotTo(HaveOccurred())
Expect(first).To(Equal(int64(2000)))
Expect(last).To(Equal(int64(2000)))
})

It("skips errored backups with Stop=0", func() {
c := &catalog.Catalog{
Backups: []catalog.PgbackrestBackup{
{
Time: catalog.PgbackrestBackupTime{Start: 1000, Stop: 0},
WAL: catalog.PgbackrestBackupWALArchive{Start: "000000010000000000000001", Stop: "000000010000000000000002"},
},
{
Time: catalog.PgbackrestBackupTime{Start: 3000, Stop: 4000},
WAL: catalog.PgbackrestBackupWALArchive{Start: "000000010000000000000003", Stop: "000000010000000000000004"},
},
},
}
first, last, err := getRecoveryWindow(c)
Expect(err).NotTo(HaveOccurred())
Expect(first).To(Equal(int64(4000)))
Expect(last).To(Equal(int64(4000)))
})

It("returns error when all backups are errored", func() {
c := &catalog.Catalog{
Backups: []catalog.PgbackrestBackup{
{
Time: catalog.PgbackrestBackupTime{Start: 0, Stop: 0},
WAL: catalog.PgbackrestBackupWALArchive{Start: "000000010000000000000001", Stop: "000000010000000000000002"},
},
{
Time: catalog.PgbackrestBackupTime{Start: 1000, Stop: 0},
WAL: catalog.PgbackrestBackupWALArchive{Start: "000000010000000000000003", Stop: "000000010000000000000004"},
},
},
}
_, _, err := getRecoveryWindow(c)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal("no successful backups found"))
})
})
4 changes: 4 additions & 0 deletions internal/cnpgi/instance/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/cloudnative-pg/cnpg-i-machinery/pkg/pluginhelper/http"
"github.com/cloudnative-pg/cnpg-i/pkg/backup"
"github.com/cloudnative-pg/cnpg-i/pkg/metrics"
"github.com/cloudnative-pg/cnpg-i/pkg/wal"
"google.golang.org/grpc"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -55,6 +56,9 @@ func (c *CNPGI) Start(ctx context.Context) error {
InstanceName: c.InstanceName,
PGDataPath: c.PGDataPath,
})
metrics.RegisterMetricsServer(server, MetricsServiceImplementation{
Client: c.Client,
})
common.AddHealthCheck(server)
return nil
}
Expand Down
13 changes: 13 additions & 0 deletions internal/cnpgi/instance/suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package instance

import (
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

func TestInstance(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Instance Suite")
}