From 002109c71a89e583bdcdc922b86711f1b2c5f377 Mon Sep 17 00:00:00 2001 From: Jason Lynch Date: Tue, 5 May 2026 14:42:13 -0400 Subject: [PATCH 01/15] fix: use most recently fetched primary instance id Instances are updated sequentially and switchovers can happen during that sequence. We want to use the most recently fetched primary instance ID in the node resource to account for those switchovers. This change is backward compatible and does not need any migration. PLAT-512 --- server/internal/database/instance_resource.go | 38 +++++++++++-------- server/internal/database/node_resource.go | 17 +++++++-- 2 files changed, 35 insertions(+), 20 deletions(-) diff --git a/server/internal/database/instance_resource.go b/server/internal/database/instance_resource.go index a51a395b..d624f8b1 100644 --- a/server/internal/database/instance_resource.go +++ b/server/internal/database/instance_resource.go @@ -31,12 +31,13 @@ func InstanceResourceIdentifier(instanceID string) resource.Identifier { } type InstanceResource struct { - Spec *InstanceSpec `json:"spec"` - InstanceHostname string `json:"instance_hostname"` - PrimaryInstanceID string `json:"primary_instance_id"` - OrchestratorDependencies []resource.Identifier `json:"dependencies"` - ConnectionInfo *ConnectionInfo `json:"connection_info"` - PostInit *Script `json:"post_init"` + Spec *InstanceSpec `json:"spec"` + InstanceHostname string `json:"instance_hostname"` + PrimaryInstanceID string `json:"primary_instance_id"` + PrimaryInstanceIDUpdatedAt time.Time `json:"primary_instance_id_updated_at"` + OrchestratorDependencies []resource.Identifier `json:"dependencies"` + ConnectionInfo *ConnectionInfo `json:"connection_info"` + PostInit *Script `json:"post_init"` } func (r *InstanceResource) ResourceVersion() string { @@ -46,6 +47,7 @@ func (r *InstanceResource) ResourceVersion() string { func (r *InstanceResource) DiffIgnore() []string { return []string{ "/primary_instance_id", + "/primary_instance_id_updated_at", "/connection_info", } } @@ -80,13 +82,9 @@ func (r *InstanceResource) Refresh(ctx context.Context, rc *resource.Context) er if err := r.updateConnectionInfo(ctx, rc); err != nil { return resource.ErrNotFound } - - primaryInstanceID, err := GetPrimaryInstanceID(ctx, r.patroniClient(), 30*time.Second) - if err != nil { + if err := r.updatePrimaryInstanceID(ctx, 30*time.Second); err != nil { return resource.ErrNotFound } - r.PrimaryInstanceID = primaryInstanceID - if err := SetScriptNeedsToRun(ctx, rc, r.PostInit); err != nil { return err } @@ -190,16 +188,24 @@ func (r *InstanceResource) Paths(orchestrator Orchestrator) (InstancePaths, erro return paths, nil } -func (r *InstanceResource) initializeInstance(ctx context.Context, rc *resource.Context) error { - patroniClient := r.patroniClient() - primaryInstanceID, err := GetPrimaryInstanceID(ctx, patroniClient, time.Minute) +func (r *InstanceResource) updatePrimaryInstanceID(ctx context.Context, timeout time.Duration) error { + primaryInstanceID, err := GetPrimaryInstanceID(ctx, r.patroniClient(), timeout) if err != nil { return err } r.PrimaryInstanceID = primaryInstanceID + r.PrimaryInstanceIDUpdatedAt = time.Now() + + return nil +} + +func (r *InstanceResource) initializeInstance(ctx context.Context, rc *resource.Context) error { + if err := r.updatePrimaryInstanceID(ctx, time.Minute); err != nil { + return err + } if r.Spec.InstanceID != r.PrimaryInstanceID { - err = r.updateInstanceRecord(ctx, rc, &InstanceUpdateOptions{State: InstanceStateAvailable}) + err := r.updateInstanceRecord(ctx, rc, &InstanceUpdateOptions{State: InstanceStateAvailable}) if err != nil { return r.recordError(ctx, rc, err) } @@ -209,7 +215,7 @@ func (r *InstanceResource) initializeInstance(ctx context.Context, rc *resource. // Enable failsafe mode if this instance is the only one in the node. // Otherwise, disable it. - _, err = patroniClient.PatchDynamicConfig(ctx, &patroni.DynamicConfig{ + _, err := r.patroniClient().PatchDynamicConfig(ctx, &patroni.DynamicConfig{ FailsafeMode: utils.PointerTo(r.Spec.NodeSize == 1), }) if err != nil { diff --git a/server/internal/database/node_resource.go b/server/internal/database/node_resource.go index 333ea52a..d1a33f6f 100644 --- a/server/internal/database/node_resource.go +++ b/server/internal/database/node_resource.go @@ -6,6 +6,7 @@ import ( "fmt" "slices" "strings" + "time" "github.com/pgEdge/control-plane/server/internal/postgres" "github.com/pgEdge/control-plane/server/internal/resource" @@ -72,7 +73,8 @@ func (n *NodeResource) Create(ctx context.Context, rc *resource.Context) error { // Some instances may be down or in a bad state. We'll want to check all of // them to find one that knows the primary instance ID. - n.PrimaryInstanceID = "" + var primaryInstanceID string + var primaryInstanceUpdatedAt time.Time for _, id := range n.InstanceIDs { instance, err := resource.FromContext[*InstanceResource](rc, InstanceResourceIdentifier(id)) if errors.Is(err, resource.ErrNotFound) { @@ -81,11 +83,18 @@ func (n *NodeResource) Create(ctx context.Context, rc *resource.Context) error { return fmt.Errorf("failed to get instance %q: %w", id, err) } - if instance.PrimaryInstanceID != "" { - n.PrimaryInstanceID = instance.PrimaryInstanceID - break + if instance.PrimaryInstanceID == "" { + continue + } + // Instances are updated sequentially and a switchover can happen after + // an earlier instance update. We use the 'updated at' field to pick the + // most recently fetched primary instance ID. + if primaryInstanceID == "" || instance.PrimaryInstanceIDUpdatedAt.After(primaryInstanceUpdatedAt) { + primaryInstanceID = instance.PrimaryInstanceID + primaryInstanceUpdatedAt = instance.PrimaryInstanceIDUpdatedAt } } + n.PrimaryInstanceID = primaryInstanceID return nil } From 84ef96a726cb13a20f34b67ab670310e481c056e Mon Sep 17 00:00:00 2001 From: Jason Lynch Date: Tue, 5 May 2026 14:46:19 -0400 Subject: [PATCH 02/15] test: add brief sleep to cancel task test This test still causes issues occasionally due to how quickly it cancels the task after starting it. This sleep is a workaround until we're able to find and fix the underlying issue. PLAT-512 --- e2e/cancel_task_test.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/e2e/cancel_task_test.go b/e2e/cancel_task_test.go index 895acd2b..b866fab1 100644 --- a/e2e/cancel_task_test.go +++ b/e2e/cancel_task_test.go @@ -51,6 +51,11 @@ func testCancelDB(t *testing.T) { database := create_resp.Database t.Logf("successfully created cancel task test db") + // TODO: even after many attempts at fixing this, rapidly cancelling a task + // still occasionally causes problems. This sleep is a workaround until + // we're able to track down the issue. + time.Sleep(500 * time.Millisecond) + cancelation_task, err := fixture.Client.CancelDatabaseTask(t.Context(), &controlplane.CancelDatabaseTaskPayload{ DatabaseID: database.ID, TaskID: controlplane.Identifier(creation_task.TaskID), From 1144812f892fa853a1838ade9ca4b005b3029cb5 Mon Sep 17 00:00:00 2001 From: Jason Lynch Date: Mon, 4 May 2026 14:10:49 -0400 Subject: [PATCH 03/15] feat: improve ds.PgEdgeVersion initializers Improves the ds.PgEdgeVersion initializers by: - Renaming `NewPgEdgeVersion`, which parsed a `PgEdgeVersion` from strings to `ParsePgEdgeVersion`. - Add a method to normalize `PgEdgeVersion`s so that the Postgres version contains major and minor components and the Spock version contains just a major component. This matches the way that versions are currently provided in our API. PLAT-512 --- server/internal/database/service.go | 4 +- server/internal/database/spec.go | 4 +- server/internal/ds/versions.go | 40 +++++-- server/internal/ds/versions_test.go | 24 ++--- server/internal/host/host_test.go | 102 +++++++++--------- .../common/patroni_config_generator_test.go | 12 +-- server/internal/orchestrator/swarm/images.go | 26 ++--- .../swarm/rag_instance_resources_test.go | 2 +- server/internal/workflows/plan_update.go | 2 +- 9 files changed, 122 insertions(+), 94 deletions(-) diff --git a/server/internal/database/service.go b/server/internal/database/service.go index 5fa90e2a..118f4786 100644 --- a/server/internal/database/service.go +++ b/server/internal/database/service.go @@ -608,7 +608,7 @@ func (s *Service) PopulateSpecDefaults(ctx context.Context, spec *Spec) error { if spec.SpockVersion == "" { spec.SpockVersion = defaultVersion.SpockVersion.String() } - specVersion, err := ds.NewPgEdgeVersion(spec.PostgresVersion, spec.SpockVersion) + specVersion, err := ds.ParsePgEdgeVersion(spec.PostgresVersion, spec.SpockVersion) if err != nil { return fmt.Errorf("failed to parse versions from spec: %w", err) } @@ -628,7 +628,7 @@ func (s *Service) PopulateSpecDefaults(ctx context.Context, spec *Spec) error { return fmt.Errorf("host %s not found in host list", hostID) } if node.PostgresVersion != "" { - nodeVersion, err := ds.NewPgEdgeVersion(node.PostgresVersion, spec.SpockVersion) + nodeVersion, err := ds.ParsePgEdgeVersion(node.PostgresVersion, spec.SpockVersion) if err != nil { return fmt.Errorf("failed to parse versions from nodes[%d] spec: %w", idx, err) } diff --git a/server/internal/database/spec.go b/server/internal/database/spec.go index 4d3c024d..d3985b5e 100644 --- a/server/internal/database/spec.go +++ b/server/internal/database/spec.go @@ -622,7 +622,7 @@ func (n *NodeInstances) InstanceIDs() []string { } func (s *Spec) NodeInstances() ([]*NodeInstances, error) { - specVersion, err := ds.NewPgEdgeVersion(s.PostgresVersion, s.SpockVersion) + specVersion, err := ds.ParsePgEdgeVersion(s.PostgresVersion, s.SpockVersion) if err != nil { return nil, fmt.Errorf("failed to parse version from spec: %w", err) } @@ -659,7 +659,7 @@ func (s *Spec) NodeInstances() ([]*NodeInstances, error) { // Respect node-level overrides nodeVersion := specVersion if node.PostgresVersion != "" { - nodeVersion, err = ds.NewPgEdgeVersion(node.PostgresVersion, s.SpockVersion) + nodeVersion, err = ds.ParsePgEdgeVersion(node.PostgresVersion, s.SpockVersion) if err != nil { return nil, fmt.Errorf("failed to parse version from node spec: %w", err) } diff --git a/server/internal/ds/versions.go b/server/internal/ds/versions.go index 21ef942e..5128dd3f 100644 --- a/server/internal/ds/versions.go +++ b/server/internal/ds/versions.go @@ -73,6 +73,16 @@ func (v *Version) MajorVersion() *Version { } } +func (v *Version) MajorMinorVersion() *Version { + components := slices.Clone(v.Components) + if len(components) > 2 { + components = components[:2] + } + return &Version{ + Components: components, + } +} + func (v *Version) String() string { components := make([]string, len(v.Components)) for i, c := range v.Components { @@ -193,24 +203,42 @@ func (v *PgEdgeVersion) GreaterThan(other *PgEdgeVersion) bool { return v.Compare(other) > 0 } -func MustPgEdgeVersion(postgresVersion, spockVersion string) *PgEdgeVersion { - v, err := NewPgEdgeVersion(postgresVersion, spockVersion) +// Normalize returns the Postgres version in major.minor format and the Spock +// version in major format. This matches the way that versions are currently +// provided in our API. +func (v *PgEdgeVersion) Normalize() (*PgEdgeVersion, error) { + pv := v.PostgresVersion.MajorMinorVersion() + if len(pv.Components) != 2 { + return nil, fmt.Errorf("expected at least a major and minor version for postgres, got '%s'", pv) + } + sv := v.SpockVersion.MajorVersion() + if len(sv.Components) != 1 { + return nil, fmt.Errorf("expected at least a major version for spock, got '%s'", sv) + } + + return &PgEdgeVersion{ + PostgresVersion: pv, + SpockVersion: sv, + }, nil +} + +func MustParsePgEdgeVersion(postgresVersion, spockVersion string) *PgEdgeVersion { + v, err := ParsePgEdgeVersion(postgresVersion, spockVersion) if err != nil { panic(err) } return v } -func NewPgEdgeVersion(postgresVersion, spockVersion string) (*PgEdgeVersion, error) { +func ParsePgEdgeVersion(postgresVersion, spockVersion string) (*PgEdgeVersion, error) { pv, err := ParseVersion(postgresVersion) if err != nil { - return nil, fmt.Errorf("invalid postgres version: %q", postgresVersion) + return nil, fmt.Errorf("invalid postgres version: '%s'", postgresVersion) } sv, err := ParseVersion(spockVersion) if err != nil { - return nil, fmt.Errorf("invalid spock version: %q", spockVersion) + return nil, fmt.Errorf("invalid spock version: '%s'", spockVersion) } - return &PgEdgeVersion{ PostgresVersion: pv, SpockVersion: sv, diff --git a/server/internal/ds/versions_test.go b/server/internal/ds/versions_test.go index a72e0cdb..7b19dd90 100644 --- a/server/internal/ds/versions_test.go +++ b/server/internal/ds/versions_test.go @@ -230,7 +230,7 @@ func TestNewPgEdgeVersion(t *testing.T) { }, } { t.Run(tc.postgresVersion+"_"+tc.spockVersion, func(t *testing.T) { - result, err := ds.NewPgEdgeVersion(tc.postgresVersion, tc.spockVersion) + result, err := ds.ParsePgEdgeVersion(tc.postgresVersion, tc.spockVersion) if tc.expectedErr != "" { assert.Nil(t, result) assert.ErrorContains(t, err, tc.expectedErr) @@ -244,7 +244,7 @@ func TestNewPgEdgeVersion(t *testing.T) { func TestPgEdgeVersion(t *testing.T) { t.Run("String", func(t *testing.T) { - version := ds.MustPgEdgeVersion("17.6", "5.0.0") + version := ds.MustParsePgEdgeVersion("17.6", "5.0.0") assert.Equal(t, "17.6_5.0.0", version.String()) }) @@ -255,28 +255,28 @@ func TestPgEdgeVersion(t *testing.T) { expected int }{ { - a: ds.MustPgEdgeVersion("17.6", "5.0.0"), - b: ds.MustPgEdgeVersion("17.6", "5.0.0"), + a: ds.MustParsePgEdgeVersion("17.6", "5.0.0"), + b: ds.MustParsePgEdgeVersion("17.6", "5.0.0"), expected: 0, }, { - a: ds.MustPgEdgeVersion("18.0", "5.0.0"), - b: ds.MustPgEdgeVersion("17.6", "5.0.0"), + a: ds.MustParsePgEdgeVersion("18.0", "5.0.0"), + b: ds.MustParsePgEdgeVersion("17.6", "5.0.0"), expected: 1, }, { - a: ds.MustPgEdgeVersion("17.6", "5.0.0"), - b: ds.MustPgEdgeVersion("18.0", "5.0.0"), + a: ds.MustParsePgEdgeVersion("17.6", "5.0.0"), + b: ds.MustParsePgEdgeVersion("18.0", "5.0.0"), expected: -1, }, { - a: ds.MustPgEdgeVersion("17.6", "5.0.0"), - b: ds.MustPgEdgeVersion("17.6", "5.0.1"), + a: ds.MustParsePgEdgeVersion("17.6", "5.0.0"), + b: ds.MustParsePgEdgeVersion("17.6", "5.0.1"), expected: -1, }, { - a: ds.MustPgEdgeVersion("17.6", "5.0.0"), - b: ds.MustPgEdgeVersion("17.6", "4.10.0"), + a: ds.MustParsePgEdgeVersion("17.6", "5.0.0"), + b: ds.MustParsePgEdgeVersion("17.6", "4.10.0"), expected: 1, }, } { diff --git a/server/internal/host/host_test.go b/server/internal/host/host_test.go index 97a76bdf..f8ab283a 100644 --- a/server/internal/host/host_test.go +++ b/server/internal/host/host_test.go @@ -20,78 +20,78 @@ func TestGreatestCommonDefaultVersion(t *testing.T) { { name: "same supported versions", defaultVersions: []*ds.PgEdgeVersion{ - ds.MustPgEdgeVersion("17.6", "5"), - ds.MustPgEdgeVersion("17.6", "5"), - ds.MustPgEdgeVersion("17.6", "5"), + ds.MustParsePgEdgeVersion("17.6", "5"), + ds.MustParsePgEdgeVersion("17.6", "5"), + ds.MustParsePgEdgeVersion("17.6", "5"), }, supportedVersions: [][]*ds.PgEdgeVersion{ { - ds.MustPgEdgeVersion("16.10", "5"), - ds.MustPgEdgeVersion("18.1", "5"), - ds.MustPgEdgeVersion("17.6", "5"), + ds.MustParsePgEdgeVersion("16.10", "5"), + ds.MustParsePgEdgeVersion("18.1", "5"), + ds.MustParsePgEdgeVersion("17.6", "5"), }, { - ds.MustPgEdgeVersion("16.10", "5"), - ds.MustPgEdgeVersion("18.1", "5"), - ds.MustPgEdgeVersion("17.6", "5"), + ds.MustParsePgEdgeVersion("16.10", "5"), + ds.MustParsePgEdgeVersion("18.1", "5"), + ds.MustParsePgEdgeVersion("17.6", "5"), }, { - ds.MustPgEdgeVersion("16.10", "5"), - ds.MustPgEdgeVersion("18.1", "5"), - ds.MustPgEdgeVersion("17.6", "5"), + ds.MustParsePgEdgeVersion("16.10", "5"), + ds.MustParsePgEdgeVersion("18.1", "5"), + ds.MustParsePgEdgeVersion("17.6", "5"), }, }, - expected: ds.MustPgEdgeVersion("17.6", "5"), + expected: ds.MustParsePgEdgeVersion("17.6", "5"), }, { name: "one newer", defaultVersions: []*ds.PgEdgeVersion{ - ds.MustPgEdgeVersion("17.7", "5"), - ds.MustPgEdgeVersion("17.6", "5"), - ds.MustPgEdgeVersion("17.6", "5"), + ds.MustParsePgEdgeVersion("17.7", "5"), + ds.MustParsePgEdgeVersion("17.6", "5"), + ds.MustParsePgEdgeVersion("17.6", "5"), }, supportedVersions: [][]*ds.PgEdgeVersion{ { - ds.MustPgEdgeVersion("16.10", "5"), - ds.MustPgEdgeVersion("18.1", "5"), - ds.MustPgEdgeVersion("17.6", "5"), - ds.MustPgEdgeVersion("17.7", "5"), + ds.MustParsePgEdgeVersion("16.10", "5"), + ds.MustParsePgEdgeVersion("18.1", "5"), + ds.MustParsePgEdgeVersion("17.6", "5"), + ds.MustParsePgEdgeVersion("17.7", "5"), }, { - ds.MustPgEdgeVersion("16.10", "5"), - ds.MustPgEdgeVersion("18.1", "5"), - ds.MustPgEdgeVersion("17.6", "5"), + ds.MustParsePgEdgeVersion("16.10", "5"), + ds.MustParsePgEdgeVersion("18.1", "5"), + ds.MustParsePgEdgeVersion("17.6", "5"), }, { - ds.MustPgEdgeVersion("16.10", "5"), - ds.MustPgEdgeVersion("18.1", "5"), - ds.MustPgEdgeVersion("17.6", "5"), + ds.MustParsePgEdgeVersion("16.10", "5"), + ds.MustParsePgEdgeVersion("18.1", "5"), + ds.MustParsePgEdgeVersion("17.6", "5"), }, }, - expected: ds.MustPgEdgeVersion("17.6", "5"), + expected: ds.MustParsePgEdgeVersion("17.6", "5"), }, { name: "no overlaps", defaultVersions: []*ds.PgEdgeVersion{ - ds.MustPgEdgeVersion("18.0", "6"), - ds.MustPgEdgeVersion("17.6", "5"), - ds.MustPgEdgeVersion("17.6", "5"), + ds.MustParsePgEdgeVersion("18.0", "6"), + ds.MustParsePgEdgeVersion("17.6", "5"), + ds.MustParsePgEdgeVersion("17.6", "5"), }, supportedVersions: [][]*ds.PgEdgeVersion{ { - ds.MustPgEdgeVersion("16.11", "6"), - ds.MustPgEdgeVersion("17.7", "6"), - ds.MustPgEdgeVersion("18.0", "6"), + ds.MustParsePgEdgeVersion("16.11", "6"), + ds.MustParsePgEdgeVersion("17.7", "6"), + ds.MustParsePgEdgeVersion("18.0", "6"), }, { - ds.MustPgEdgeVersion("16.10", "5"), - ds.MustPgEdgeVersion("18.1", "5"), - ds.MustPgEdgeVersion("17.6", "5"), + ds.MustParsePgEdgeVersion("16.10", "5"), + ds.MustParsePgEdgeVersion("18.1", "5"), + ds.MustParsePgEdgeVersion("17.6", "5"), }, { - ds.MustPgEdgeVersion("16.10", "5"), - ds.MustPgEdgeVersion("18.1", "5"), - ds.MustPgEdgeVersion("17.6", "5"), + ds.MustParsePgEdgeVersion("16.10", "5"), + ds.MustParsePgEdgeVersion("18.1", "5"), + ds.MustParsePgEdgeVersion("17.6", "5"), }, }, expectedErr: "no common default versions found between the given hosts", @@ -104,25 +104,25 @@ func TestGreatestCommonDefaultVersion(t *testing.T) { // version. name: "no overlapping defaults", defaultVersions: []*ds.PgEdgeVersion{ - ds.MustPgEdgeVersion("18.0", "6"), - ds.MustPgEdgeVersion("18.1", "5"), - ds.MustPgEdgeVersion("18.1", "5"), + ds.MustParsePgEdgeVersion("18.0", "6"), + ds.MustParsePgEdgeVersion("18.1", "5"), + ds.MustParsePgEdgeVersion("18.1", "5"), }, supportedVersions: [][]*ds.PgEdgeVersion{ { - ds.MustPgEdgeVersion("16.11", "5"), - ds.MustPgEdgeVersion("17.6", "5"), - ds.MustPgEdgeVersion("18.0", "5"), + ds.MustParsePgEdgeVersion("16.11", "5"), + ds.MustParsePgEdgeVersion("17.6", "5"), + ds.MustParsePgEdgeVersion("18.0", "5"), }, { - ds.MustPgEdgeVersion("16.10", "5"), - ds.MustPgEdgeVersion("18.1", "5"), - ds.MustPgEdgeVersion("17.6", "5"), + ds.MustParsePgEdgeVersion("16.10", "5"), + ds.MustParsePgEdgeVersion("18.1", "5"), + ds.MustParsePgEdgeVersion("17.6", "5"), }, { - ds.MustPgEdgeVersion("16.10", "5"), - ds.MustPgEdgeVersion("18.1", "5"), - ds.MustPgEdgeVersion("17.6", "5"), + ds.MustParsePgEdgeVersion("16.10", "5"), + ds.MustParsePgEdgeVersion("18.1", "5"), + ds.MustParsePgEdgeVersion("17.6", "5"), }, }, expectedErr: "no common default versions found between the given hosts", diff --git a/server/internal/orchestrator/common/patroni_config_generator_test.go b/server/internal/orchestrator/common/patroni_config_generator_test.go index aabfee36..b495f51b 100644 --- a/server/internal/orchestrator/common/patroni_config_generator_test.go +++ b/server/internal/orchestrator/common/patroni_config_generator_test.go @@ -52,7 +52,7 @@ func TestPatroniConfigGenerator(t *testing.T) { DatabaseName: "app", NodeName: "n1", NodeOrdinal: 1, - PgEdgeVersion: ds.MustPgEdgeVersion("18.1", "5.0.4"), + PgEdgeVersion: ds.MustParsePgEdgeVersion("18.1", "5.0.4"), ClusterSize: 3, }, HostCPUs: 4, @@ -109,7 +109,7 @@ func TestPatroniConfigGenerator(t *testing.T) { DatabaseName: "app", NodeName: "n1", NodeOrdinal: 1, - PgEdgeVersion: ds.MustPgEdgeVersion("18.1", "5.0.4"), + PgEdgeVersion: ds.MustParsePgEdgeVersion("18.1", "5.0.4"), ClusterSize: 3, BackupConfig: &database.BackupConfig{}, }, @@ -167,7 +167,7 @@ func TestPatroniConfigGenerator(t *testing.T) { DatabaseName: "app", NodeName: "n1", NodeOrdinal: 1, - PgEdgeVersion: ds.MustPgEdgeVersion("18.1", "5.0.4"), + PgEdgeVersion: ds.MustParsePgEdgeVersion("18.1", "5.0.4"), ClusterSize: 3, RestoreConfig: &database.RestoreConfig{}, }, @@ -225,7 +225,7 @@ func TestPatroniConfigGenerator(t *testing.T) { DatabaseName: "app", NodeName: "n1", NodeOrdinal: 1, - PgEdgeVersion: ds.MustPgEdgeVersion("18.1", "5.0.4"), + PgEdgeVersion: ds.MustParsePgEdgeVersion("18.1", "5.0.4"), ClusterSize: 3, RestoreConfig: &database.RestoreConfig{}, InPlaceRestore: true, @@ -284,7 +284,7 @@ func TestPatroniConfigGenerator(t *testing.T) { DatabaseName: "app", NodeName: "n1", NodeOrdinal: 1, - PgEdgeVersion: ds.MustPgEdgeVersion("18.1", "5.0.4"), + PgEdgeVersion: ds.MustParsePgEdgeVersion("18.1", "5.0.4"), ClusterSize: 3, }, HostCPUs: 4, @@ -325,7 +325,7 @@ func TestPatroniConfigGenerator(t *testing.T) { DatabaseName: "app", NodeName: "n1", NodeOrdinal: 1, - PgEdgeVersion: ds.MustPgEdgeVersion("18.1", "5.0.4"), + PgEdgeVersion: ds.MustParsePgEdgeVersion("18.1", "5.0.4"), ClusterSize: 3, }, HostCPUs: 4, diff --git a/server/internal/orchestrator/swarm/images.go b/server/internal/orchestrator/swarm/images.go index a474c252..fe59d251 100644 --- a/server/internal/orchestrator/swarm/images.go +++ b/server/internal/orchestrator/swarm/images.go @@ -25,49 +25,49 @@ func NewVersions(cfg config.Config) *Versions { } // pg16 - versions.addImage(ds.MustPgEdgeVersion("16.10", "5"), &Images{ + versions.addImage(ds.MustParsePgEdgeVersion("16.10", "5"), &Images{ PgEdgeImage: imageTag(cfg, "16.10-spock5.0.4-standard-3"), }) - versions.addImage(ds.MustPgEdgeVersion("16.11", "5"), &Images{ + versions.addImage(ds.MustParsePgEdgeVersion("16.11", "5"), &Images{ PgEdgeImage: imageTag(cfg, "16.11-spock5.0.4-standard-4"), }) - versions.addImage(ds.MustPgEdgeVersion("16.12", "5"), &Images{ + versions.addImage(ds.MustParsePgEdgeVersion("16.12", "5"), &Images{ PgEdgeImage: imageTag(cfg, "16.12-spock5.0.5-standard-1"), }) - versions.addImage(ds.MustPgEdgeVersion("16.13", "5"), &Images{ + versions.addImage(ds.MustParsePgEdgeVersion("16.13", "5"), &Images{ PgEdgeImage: imageTag(cfg, "16.13-spock5.0.7-standard-1"), }) // pg17 - versions.addImage(ds.MustPgEdgeVersion("17.6", "5"), &Images{ + versions.addImage(ds.MustParsePgEdgeVersion("17.6", "5"), &Images{ PgEdgeImage: imageTag(cfg, "17.6-spock5.0.4-standard-3"), }) - versions.addImage(ds.MustPgEdgeVersion("17.7", "5"), &Images{ + versions.addImage(ds.MustParsePgEdgeVersion("17.7", "5"), &Images{ PgEdgeImage: imageTag(cfg, "17.7-spock5.0.4-standard-4"), }) - versions.addImage(ds.MustPgEdgeVersion("17.8", "5"), &Images{ + versions.addImage(ds.MustParsePgEdgeVersion("17.8", "5"), &Images{ PgEdgeImage: imageTag(cfg, "17.8-spock5.0.5-standard-1"), }) - versions.addImage(ds.MustPgEdgeVersion("17.9", "5"), &Images{ + versions.addImage(ds.MustParsePgEdgeVersion("17.9", "5"), &Images{ PgEdgeImage: imageTag(cfg, "17.9-spock5.0.7-standard-1"), }) // pg18 - versions.addImage(ds.MustPgEdgeVersion("18.0", "5"), &Images{ + versions.addImage(ds.MustParsePgEdgeVersion("18.0", "5"), &Images{ PgEdgeImage: imageTag(cfg, "18.0-spock5.0.4-standard-3"), }) - versions.addImage(ds.MustPgEdgeVersion("18.1", "5"), &Images{ + versions.addImage(ds.MustParsePgEdgeVersion("18.1", "5"), &Images{ PgEdgeImage: imageTag(cfg, "18.1-spock5.0.4-standard-4"), }) - versions.addImage(ds.MustPgEdgeVersion("18.2", "5"), &Images{ + versions.addImage(ds.MustParsePgEdgeVersion("18.2", "5"), &Images{ PgEdgeImage: imageTag(cfg, "18.2-spock5.0.5-standard-1"), }) - versions.addImage(ds.MustPgEdgeVersion("18.3", "5"), &Images{ + versions.addImage(ds.MustParsePgEdgeVersion("18.3", "5"), &Images{ PgEdgeImage: imageTag(cfg, "18.3-spock5.0.7-standard-1"), }) - versions.defaultVersion = ds.MustPgEdgeVersion("18.3", "5") + versions.defaultVersion = ds.MustParsePgEdgeVersion("18.3", "5") return versions } diff --git a/server/internal/orchestrator/swarm/rag_instance_resources_test.go b/server/internal/orchestrator/swarm/rag_instance_resources_test.go index aa4d3355..383d07b2 100644 --- a/server/internal/orchestrator/swarm/rag_instance_resources_test.go +++ b/server/internal/orchestrator/swarm/rag_instance_resources_test.go @@ -227,7 +227,7 @@ func TestGenerateRAGInstanceResources_IncompatibleVersion(t *testing.T) { NodeName: "n1", ConnectAsUsername: "app_read_only", ConnectAsPassword: "secret", - PgEdgeVersion: ds.MustPgEdgeVersion("17", "5.0.0"), + PgEdgeVersion: ds.MustParsePgEdgeVersion("17", "5.0.0"), } _, err := o.generateRAGInstanceResources(spec) diff --git a/server/internal/workflows/plan_update.go b/server/internal/workflows/plan_update.go index f8947f97..162380ce 100644 --- a/server/internal/workflows/plan_update.go +++ b/server/internal/workflows/plan_update.go @@ -99,7 +99,7 @@ func (w *Workflows) getServiceResources( nodeInstances []*database.NodeInstances, ) (*operations.ServiceResources, error) { serviceInstanceID := database.GenerateServiceInstanceID(spec.DatabaseID, serviceSpec.ServiceID, hostID) - pgEdgeVersion, err := ds.NewPgEdgeVersion(spec.PostgresVersion, spec.SpockVersion) + pgEdgeVersion, err := ds.ParsePgEdgeVersion(spec.PostgresVersion, spec.SpockVersion) if err != nil { return nil, fmt.Errorf("failed to parse pgedge version: %w", err) } From 4e56f61b21a2c1aa96e4de5e3da72750d7e2e7b8 Mon Sep 17 00:00:00 2001 From: Jason Lynch Date: Mon, 4 May 2026 14:15:22 -0400 Subject: [PATCH 04/15] feat: add conditions to etcd txn Adds an `AddConditions` method to `storage.Txn` to make it possible to add additional constraints to a transaction. This enables us to enforce other constraints that are outside of the values we're updating. PLAT-512 --- server/internal/storage/interface.go | 1 + server/internal/storage/txn.go | 6 ++++++ 2 files changed, 7 insertions(+) diff --git a/server/internal/storage/interface.go b/server/internal/storage/interface.go index 6c5d9589..7b8d23a0 100644 --- a/server/internal/storage/interface.go +++ b/server/internal/storage/interface.go @@ -46,6 +46,7 @@ type TxnOperation interface { // on a unique key. type Txn interface { AddOps(ops ...TxnOperation) + AddConditions(cmps ...clientv3.Cmp) Commit(ctx context.Context) error } diff --git a/server/internal/storage/txn.go b/server/internal/storage/txn.go index 93ae5b0d..3cc379f2 100644 --- a/server/internal/storage/txn.go +++ b/server/internal/storage/txn.go @@ -11,6 +11,7 @@ import ( type txn struct { ops []TxnOperation client *clientv3.Client + cmps []clientv3.Cmp } func NewTxn(client *clientv3.Client, ops ...TxnOperation) Txn { @@ -24,6 +25,10 @@ func (t *txn) AddOps(ops ...TxnOperation) { t.ops = append(t.ops, ops...) } +func (t *txn) AddConditions(cmps ...clientv3.Cmp) { + t.cmps = append(t.cmps, cmps...) +} + func (t *txn) Commit(ctx context.Context) error { var allOps []clientv3.Op var allCmps []clientv3.Cmp @@ -43,6 +48,7 @@ func (t *txn) Commit(ctx context.Context) error { cachedOps = append(cachedOps, c) } } + allCmps = append(allCmps, t.cmps...) // Etcd will reject the transaction if there are duplicate keys, and it // doesn't give a helpful error message. We can produce a better error by From cdc5e9c7f66d3da595a4ce3b11494ad2e56291a0 Mon Sep 17 00:00:00 2001 From: Jason Lynch Date: Mon, 4 May 2026 14:17:54 -0400 Subject: [PATCH 05/15] feat: update host in host monitor Modifies the existing host monitor to also update the host record periodically. This accounts for system changes that happen while the Control Plane is running, such as system package updates. PLAT-512 --- server/internal/monitor/host_monitor.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/server/internal/monitor/host_monitor.go b/server/internal/monitor/host_monitor.go index 9575c116..f6e19cc9 100644 --- a/server/internal/monitor/host_monitor.go +++ b/server/internal/monitor/host_monitor.go @@ -2,6 +2,7 @@ package monitor import ( "context" + "fmt" "github.com/rs/zerolog" @@ -23,7 +24,7 @@ func NewHostMonitor( m.monitor = NewMonitor( logger, host.HostMonitorRefreshInterval, - m.checkStatus, + m.update, ) return m } @@ -36,7 +37,13 @@ func (m *HostMonitor) Stop() { m.monitor.Stop() } -func (m *HostMonitor) checkStatus(ctx context.Context) error { - return m.svc.UpdateHostStatus(ctx) +func (m *HostMonitor) update(ctx context.Context) error { + if err := m.svc.UpdateHost(ctx); err != nil { + return fmt.Errorf("failed to update host: %w", err) + } + if err := m.svc.UpdateHostStatus(ctx); err != nil { + return fmt.Errorf("failed to update host status: %w", err) + } + return nil } From edaf9b49b827417e7dd3041e6cbb653cc3cee1f6 Mon Sep 17 00:00:00 2001 From: Jason Lynch Date: Mon, 4 May 2026 14:19:36 -0400 Subject: [PATCH 06/15] chore: remove empty file PLAT-512 --- server/internal/database/status.go | 4 ---- 1 file changed, 4 deletions(-) delete mode 100644 server/internal/database/status.go diff --git a/server/internal/database/status.go b/server/internal/database/status.go deleted file mode 100644 index 3ed55037..00000000 --- a/server/internal/database/status.go +++ /dev/null @@ -1,4 +0,0 @@ -package database - -// type InstanceStatus struct { -// } From c01e9079c4c7fcabdb903b88378252415e1cc6a5 Mon Sep 17 00:00:00 2001 From: Jason Lynch Date: Mon, 4 May 2026 14:20:21 -0400 Subject: [PATCH 07/15] feat: record versions for replica instances Updates the existing instance monitor to query and record the Postgres and Spock versions from replica instances. We'll need this to determine when a node has been updated by an external process, such as a system package update. PLAT-512 --- server/internal/monitor/instance_monitor.go | 41 ++++++++++----------- 1 file changed, 20 insertions(+), 21 deletions(-) diff --git a/server/internal/monitor/instance_monitor.go b/server/internal/monitor/instance_monitor.go index 57ef2505..2385fa0f 100644 --- a/server/internal/monitor/instance_monitor.go +++ b/server/internal/monitor/instance_monitor.go @@ -89,12 +89,9 @@ func (m *InstanceMonitor) checkStatus(ctx context.Context) error { if err != nil { return m.updateInstanceErrStatus(ctx, status, err) } - - if status.IsPrimary() { - err = m.populateFromDbConn(ctx, dbState, info, tlsCfg, status) - if err != nil { - return m.updateInstanceErrStatus(ctx, status, err) - } + err = m.populateFromDbConn(ctx, dbState, info, tlsCfg, status) + if err != nil { + return m.updateInstanceErrStatus(ctx, status, err) } currentInstance, err := m.dbSvc.GetInstance(ctx, m.databaseID, m.instanceID) if err != nil { @@ -160,22 +157,24 @@ func (m *InstanceMonitor) populateFromDbConn( } status.SpockVersion = utils.PointerTo(spockVersion) - spockReadOnly, err := postgres.GetSpockReadOnly().Scalar(ctx, conn) - if err != nil { - return fmt.Errorf("failed to query spock read-only status: %w", err) - } - status.ReadOnly = utils.PointerTo(spockReadOnly) + if status.IsPrimary() { + spockReadOnly, err := postgres.GetSpockReadOnly().Scalar(ctx, conn) + if err != nil { + return fmt.Errorf("failed to query spock read-only status: %w", err) + } + status.ReadOnly = utils.PointerTo(spockReadOnly) - subStatuses, err := postgres.GetSubscriptionStatuses().Scalars(ctx, conn) - if err != nil { - return fmt.Errorf("failed to query subscription statuses: %w", err) - } - for _, sub := range subStatuses { - status.Subscriptions = append(status.Subscriptions, database.SubscriptionStatus{ - ProviderNode: sub.ProviderNode, - Name: sub.SubscriptionName, - Status: sub.Status, - }) + subStatuses, err := postgres.GetSubscriptionStatuses().Scalars(ctx, conn) + if err != nil { + return fmt.Errorf("failed to query subscription statuses: %w", err) + } + for _, sub := range subStatuses { + status.Subscriptions = append(status.Subscriptions, database.SubscriptionStatus{ + ProviderNode: sub.ProviderNode, + Name: sub.SubscriptionName, + Status: sub.Status, + }) + } } return nil From f1f29d0f4ff2c819aae50679641b3504ce3ce54a Mon Sep 17 00:00:00 2001 From: Jason Lynch Date: Mon, 4 May 2026 14:23:32 -0400 Subject: [PATCH 08/15] feat: add service method to reconcile db versions Adds a new database service method to reconcile the Postgres and Spock versions that we observe through the instance monitors with the versions in the database spec. The domain logic of this process is split into a standalone function with unit tests to enforce its behavior. The service method uses a transaction with additional constraints to handle conflicts with any database operations that run while we're performing this check. The summary of this new behavior is that when an instance is updated, we update its corresponding `Instance` record in our database. If all of the instances have been updated for a node, we update the node's entry in the database spec. If all of the nodes have been updated, we normalize the database spec by setting the top-level version fields and zeroing out the node-level version fields. PLAT-512 --- server/internal/database/instance.go | 7 +- server/internal/database/instance_store.go | 5 + server/internal/database/provide.go | 7 +- .../internal/database/reconcile_versions.go | 214 ++++++++ .../database/reconcile_versions_test.go | 474 ++++++++++++++++++ server/internal/database/service.go | 5 + server/internal/database/spec.go | 24 + server/internal/logging/factory.go | 1 + 8 files changed, 734 insertions(+), 3 deletions(-) create mode 100644 server/internal/database/reconcile_versions.go create mode 100644 server/internal/database/reconcile_versions_test.go diff --git a/server/internal/database/instance.go b/server/internal/database/instance.go index b4343088..0c647ccf 100644 --- a/server/internal/database/instance.go +++ b/server/internal/database/instance.go @@ -111,6 +111,10 @@ func (s *InstanceStatus) IsPrimary() bool { return s.Role != nil && *s.Role == patroni.InstanceRolePrimary } +func (s *InstanceStatus) IsStale() bool { + return s.StatusUpdatedAt == nil || s.StatusUpdatedAt.Before(time.Now().Add(-2*InstanceMonitorRefreshInterval)) +} + func storedToInstance(instance *StoredInstance, status *StoredInstanceStatus) *Instance { if instance == nil { return nil @@ -135,8 +139,7 @@ func storedToInstance(instance *StoredInstance, status *StoredInstanceStatus) *I // We want to infer the instance state if the instance is supposed to be // available. if out.State == InstanceStateAvailable && status != nil { - breakpoint := time.Now().Add(-2 * InstanceMonitorRefreshInterval) - if out.Status.StatusUpdatedAt.Before(breakpoint) { + if out.Status.IsStale() { out.State = InstanceStateUnknown out.Status = nil return out diff --git a/server/internal/database/instance_store.go b/server/internal/database/instance_store.go index 047af123..755c67c7 100644 --- a/server/internal/database/instance_store.go +++ b/server/internal/database/instance_store.go @@ -137,6 +137,11 @@ func (s *InstanceStore) Put(item *StoredInstance) storage.PutOp[*StoredInstance] return storage.NewPutOp(s.client, key, item) } +func (s *InstanceStore) Update(item *StoredInstance) storage.PutOp[*StoredInstance] { + key := s.Key(item.DatabaseID, item.InstanceID) + return storage.NewUpdateOp(s.client, key, item) +} + func (s *InstanceStore) DeleteByKey(databaseID, instanceID string) storage.DeleteOp { key := s.Key(databaseID, instanceID) return storage.NewDeleteKeyOp(s.client, key) diff --git a/server/internal/database/provide.go b/server/internal/database/provide.go index ce7c6848..62fb6ec9 100644 --- a/server/internal/database/provide.go +++ b/server/internal/database/provide.go @@ -6,6 +6,7 @@ import ( "github.com/pgEdge/control-plane/server/internal/config" "github.com/pgEdge/control-plane/server/internal/host" + "github.com/pgEdge/control-plane/server/internal/logging" "github.com/pgEdge/control-plane/server/internal/ports" ) @@ -36,7 +37,11 @@ func provideService(i *do.Injector) { if err != nil { return nil, err } - return NewService(cfg, orch, store, hostSvc, portsSvc), nil + loggerFactory, err := do.Invoke[*logging.Factory](i) + if err != nil { + return nil, err + } + return NewService(cfg, orch, store, hostSvc, portsSvc, loggerFactory), nil }) } diff --git a/server/internal/database/reconcile_versions.go b/server/internal/database/reconcile_versions.go new file mode 100644 index 00000000..b27dccf7 --- /dev/null +++ b/server/internal/database/reconcile_versions.go @@ -0,0 +1,214 @@ +package database + +import ( + "context" + "errors" + "fmt" + + clientv3 "go.etcd.io/etcd/client/v3" + + "github.com/pgEdge/control-plane/server/internal/ds" + "github.com/pgEdge/control-plane/server/internal/storage" + "github.com/pgEdge/control-plane/server/internal/utils" +) + +func (s *Service) ReconcileAllDatabaseVersions(ctx context.Context) error { + databases, err := s.store.Database.GetAll().Exec(ctx) + if err != nil { + return fmt.Errorf("failed to get databases: %w", err) + } + for _, database := range databases { + if database.State.IsInProgress() { + continue + } + spec, err := s.store.Spec. + GetByKey(database.DatabaseID). + Exec(ctx) + if errors.Is(err, storage.ErrNotFound) { + continue + } else if err != nil { + return fmt.Errorf("failed to get spec for database '%s': %w", database.DatabaseID, err) + } + instances, err := s.store.Instance. + GetByDatabaseID(database.DatabaseID). + Exec(ctx) + if err != nil { + return fmt.Errorf("failed to get instances for database '%s': %w", database.DatabaseID, err) + } + instanceStatuses, err := s.store.InstanceStatus. + GetByDatabaseID(database.DatabaseID). + Exec(ctx) + if err != nil { + return fmt.Errorf("failed to get instance statuses for database '%s': %w", database.DatabaseID, err) + } + + logger := s.logger.With(). + Str("database_id", database.DatabaseID). + Logger() + + var ops []storage.TxnOperation + updatedSpec, updatedInstances := ReconcileVersions(spec, instances, instanceStatuses) + for _, instance := range updatedInstances { + logger.Info(). + Str("node_name", instance.NodeName). + Str("host_id", instance.HostID). + Str("instance_id", instance.InstanceID). + Stringer("postgres_version", instance.PgEdgeVersion.PostgresVersion). + Stringer("spock_version", instance.PgEdgeVersion.SpockVersion). + Msg("detected updated instance version") + + instanceSpec, err := s.store.InstanceSpec. + GetByKey(instance.DatabaseID, instance.InstanceID). + Exec(ctx) + if err != nil && !errors.Is(err, storage.ErrNotFound) { + return fmt.Errorf("failed to get instance spec for instance '%s': %w", instance.InstanceID, err) + } else if err == nil { + instanceSpec.Spec.PgEdgeVersion = instance.PgEdgeVersion + ops = append(ops, s.store.InstanceSpec.Update(instanceSpec)) + } + + ops = append(ops, s.store.Instance.Update(instance)) + } + if updatedSpec != nil { + logger.Info().Msg("detected updated node versions") + ops = append(ops, s.store.Spec.Update(updatedSpec)) + } + if len(ops) == 0 { + continue + } + + // We want to abandon this update if the database has been updated since + // we last fetched it. + databaseNotUpdated := clientv3.Compare(clientv3.Version(s.store.Database.Key(database.DatabaseID)), "=", database.Version()) + txn := s.store.Txn(ops...) + txn.AddConditions(databaseNotUpdated) + err = txn.Commit(ctx) + switch { + case errors.Is(err, storage.ErrOperationConstraintViolated): + logger.Warn().Msg("database modified while updating detected version. skipping update.") + case err != nil: + return fmt.Errorf("failed to update records for database '%s': %w", database.DatabaseID, err) + default: + logger.Info().Msg("successfully updated with detected versions") + } + } + + return nil +} + +func ReconcileVersions( + spec *StoredSpec, + instances []*StoredInstance, + statuses []*StoredInstanceStatus, +) (*StoredSpec, []*StoredInstance) { + instancesByNodeHost, updatedInstances := reconcileInstanceVersions(instances, statuses) + updatedSpec := reconcileNodeVersions(spec, instancesByNodeHost) + + return updatedSpec, updatedInstances +} + +type nodeHostKey struct { + nodeName string + hostID string +} + +func reconcileInstanceVersions( + instances []*StoredInstance, + statuses []*StoredInstanceStatus, +) (map[nodeHostKey]*StoredInstance, []*StoredInstance) { + var updatedInstances []*StoredInstance + statusesByID := make(map[string]*StoredInstanceStatus, len(statuses)) + for _, status := range statuses { + statusesByID[status.InstanceID] = status + } + instancesByNodeHost := make(map[nodeHostKey]*StoredInstance, len(instances)) + for _, instance := range instances { + status, ok := statusesByID[instance.InstanceID] + if !ok || status.Status.IsStale() { + continue + } + postgresVersion := utils.FromPointer(status.Status.PostgresVersion) + spockVersion := utils.FromPointer(status.Status.SpockVersion) + pgEdgeVersion, err := ds.ParsePgEdgeVersion(postgresVersion, spockVersion) + if err != nil { + continue + } + pgEdgeVersion, err = pgEdgeVersion.Normalize() + if err != nil { + continue + } + if !instance.PgEdgeVersion.Equals(pgEdgeVersion) { + instance.PgEdgeVersion = pgEdgeVersion + updatedInstances = append(updatedInstances, instance) + } + instancesByNodeHost[nodeHostKey{instance.NodeName, instance.HostID}] = instance + } + + return instancesByNodeHost, updatedInstances +} + +func observedNodeVersion( + node *Node, + instancesByNodeHost map[nodeHostKey]*StoredInstance, +) *ds.PgEdgeVersion { + var version *ds.PgEdgeVersion + for _, hostID := range node.HostIDs { + instance, ok := instancesByNodeHost[nodeHostKey{nodeName: node.Name, hostID: hostID}] + switch { + case !ok: + return nil + case version == nil: + version = instance.PgEdgeVersion + case !version.Equals(instance.PgEdgeVersion): + return nil + } + } + return version +} + +func reconcileNodeVersions( + spec *StoredSpec, + instancesByNodeHost map[nodeHostKey]*StoredInstance, +) *StoredSpec { + var updatedSpec *StoredSpec + var commonSpockVersion string + spockMatches := true + for _, node := range spec.Nodes { + currentPostgresVersion := node.PostgresVersion + if currentPostgresVersion == "" { + currentPostgresVersion = spec.PostgresVersion + } + observed := observedNodeVersion(node, instancesByNodeHost) + if observed == nil { + // we only want to update our spock version when _all_ nodes are + // observed to have the same spock version + spockMatches = false + continue + } + observedPostgresVersion := observed.PostgresVersion.String() + observedSpockVersion := observed.SpockVersion.String() + + if observedPostgresVersion != currentPostgresVersion { + node.PostgresVersion = observedPostgresVersion + // signals that we've modified the spec + updatedSpec = spec + } + if commonSpockVersion == "" { + commonSpockVersion = observedSpockVersion + } else if commonSpockVersion != observedSpockVersion { + spockMatches = false + } + } + if spockMatches && commonSpockVersion != "" && commonSpockVersion != spec.SpockVersion { + spec.SpockVersion = commonSpockVersion + updatedSpec = spec + } + if updatedSpec != nil { + updatedSpec.NormalizePostgresVersions() + if spockMatches && commonSpockVersion != "" { + updatedSpec.SpockVersion = commonSpockVersion + } + } + + return updatedSpec +} diff --git a/server/internal/database/reconcile_versions_test.go b/server/internal/database/reconcile_versions_test.go new file mode 100644 index 00000000..bdd7e700 --- /dev/null +++ b/server/internal/database/reconcile_versions_test.go @@ -0,0 +1,474 @@ +package database_test + +import ( + "testing" + "time" + + "github.com/pgEdge/control-plane/server/internal/database" + "github.com/pgEdge/control-plane/server/internal/ds" + "github.com/pgEdge/control-plane/server/internal/patroni" + "github.com/pgEdge/control-plane/server/internal/utils" + "github.com/stretchr/testify/assert" +) + +func TestReconcileVersions(t *testing.T) { + for _, tc := range []struct { + name string + spec *database.StoredSpec + instances []*database.StoredInstance + statuses []*database.StoredInstanceStatus + expectedSpec *database.StoredSpec + expectedInstances []*database.StoredInstance + }{ + { + name: "observed matches spec", + spec: &database.StoredSpec{ + Spec: &database.Spec{ + PostgresVersion: "17.4", + SpockVersion: "5", + Nodes: []*database.Node{ + {Name: "n1", HostIDs: []string{"host-1"}}, + }, + }, + }, + instances: []*database.StoredInstance{ + { + InstanceID: "n1-host-1", + NodeName: "n1", + HostID: "host-1", + PgEdgeVersion: ds.MustParsePgEdgeVersion("17.4", "5"), + }, + }, + statuses: []*database.StoredInstanceStatus{ + { + InstanceID: "n1-host-1", + Status: &database.InstanceStatus{ + StatusUpdatedAt: utils.PointerTo(time.Now()), + Role: utils.PointerTo(patroni.InstanceRolePrimary), + PostgresVersion: utils.PointerTo("17.4"), + SpockVersion: utils.PointerTo("5.0.6"), + }, + }, + }, + }, + { + name: "all nodes updated", + spec: &database.StoredSpec{ + Spec: &database.Spec{ + PostgresVersion: "17.4", + SpockVersion: "5", + Nodes: []*database.Node{ + {Name: "n1", HostIDs: []string{"host-1"}}, + {Name: "n2", HostIDs: []string{"host-2"}}, + }, + }, + }, + instances: []*database.StoredInstance{ + { + InstanceID: "n1-host-1", + NodeName: "n1", + HostID: "host-1", + PgEdgeVersion: ds.MustParsePgEdgeVersion("17.4", "5"), + }, + { + InstanceID: "n2-host-2", + NodeName: "n2", + HostID: "host-2", + PgEdgeVersion: ds.MustParsePgEdgeVersion("17.4", "5"), + }, + }, + statuses: []*database.StoredInstanceStatus{ + { + InstanceID: "n1-host-1", + Status: &database.InstanceStatus{ + StatusUpdatedAt: utils.PointerTo(time.Now()), + Role: utils.PointerTo(patroni.InstanceRolePrimary), + PostgresVersion: utils.PointerTo("17.5"), + SpockVersion: utils.PointerTo("6.0.0"), + }, + }, + { + InstanceID: "n2-host-2", + Status: &database.InstanceStatus{ + StatusUpdatedAt: utils.PointerTo(time.Now()), + Role: utils.PointerTo(patroni.InstanceRolePrimary), + PostgresVersion: utils.PointerTo("17.5"), + SpockVersion: utils.PointerTo("6.0.0"), + }, + }, + }, + expectedSpec: &database.StoredSpec{ + Spec: &database.Spec{ + PostgresVersion: "17.5", + SpockVersion: "6", + Nodes: []*database.Node{ + {Name: "n1", HostIDs: []string{"host-1"}}, + {Name: "n2", HostIDs: []string{"host-2"}}, + }, + }, + }, + expectedInstances: []*database.StoredInstance{ + { + InstanceID: "n1-host-1", + NodeName: "n1", + HostID: "host-1", + PgEdgeVersion: ds.MustParsePgEdgeVersion("17.5", "6"), + }, + { + InstanceID: "n2-host-2", + NodeName: "n2", + HostID: "host-2", + PgEdgeVersion: ds.MustParsePgEdgeVersion("17.5", "6"), + }, + }, + }, + { + name: "all nodes updated spock only", + spec: &database.StoredSpec{ + Spec: &database.Spec{ + PostgresVersion: "17.4", + SpockVersion: "5", + Nodes: []*database.Node{ + {Name: "n1", HostIDs: []string{"host-1"}}, + {Name: "n2", HostIDs: []string{"host-2"}}, + }, + }, + }, + instances: []*database.StoredInstance{ + { + InstanceID: "n1-host-1", + NodeName: "n1", + HostID: "host-1", + PgEdgeVersion: ds.MustParsePgEdgeVersion("17.4", "5"), + }, + { + InstanceID: "n2-host-2", + NodeName: "n2", + HostID: "host-2", + PgEdgeVersion: ds.MustParsePgEdgeVersion("17.4", "5"), + }, + }, + statuses: []*database.StoredInstanceStatus{ + { + InstanceID: "n1-host-1", + Status: &database.InstanceStatus{ + StatusUpdatedAt: utils.PointerTo(time.Now()), + Role: utils.PointerTo(patroni.InstanceRolePrimary), + PostgresVersion: utils.PointerTo("17.4"), + SpockVersion: utils.PointerTo("6.0.0"), + }, + }, + { + InstanceID: "n2-host-2", + Status: &database.InstanceStatus{ + StatusUpdatedAt: utils.PointerTo(time.Now()), + Role: utils.PointerTo(patroni.InstanceRolePrimary), + PostgresVersion: utils.PointerTo("17.4"), + SpockVersion: utils.PointerTo("6.0.0"), + }, + }, + }, + expectedSpec: &database.StoredSpec{ + Spec: &database.Spec{ + PostgresVersion: "17.4", + SpockVersion: "6", + Nodes: []*database.Node{ + {Name: "n1", HostIDs: []string{"host-1"}}, + {Name: "n2", HostIDs: []string{"host-2"}}, + }, + }, + }, + expectedInstances: []*database.StoredInstance{ + { + InstanceID: "n1-host-1", + NodeName: "n1", + HostID: "host-1", + PgEdgeVersion: ds.MustParsePgEdgeVersion("17.4", "6"), + }, + { + InstanceID: "n2-host-2", + NodeName: "n2", + HostID: "host-2", + PgEdgeVersion: ds.MustParsePgEdgeVersion("17.4", "6"), + }, + }, + }, + { + name: "all nodes updated with override", + spec: &database.StoredSpec{ + Spec: &database.Spec{ + PostgresVersion: "17.4", + SpockVersion: "5", + Nodes: []*database.Node{ + {Name: "n1", HostIDs: []string{"host-1"}, PostgresVersion: "17.5"}, + {Name: "n2", HostIDs: []string{"host-2"}}, + }, + }, + }, + instances: []*database.StoredInstance{ + { + InstanceID: "n1-host-1", + NodeName: "n1", + HostID: "host-1", + PgEdgeVersion: ds.MustParsePgEdgeVersion("17.4", "5"), + }, + { + InstanceID: "n2-host-2", + NodeName: "n2", + HostID: "host-2", + PgEdgeVersion: ds.MustParsePgEdgeVersion("17.4", "5"), + }, + }, + statuses: []*database.StoredInstanceStatus{ + { + InstanceID: "n1-host-1", + Status: &database.InstanceStatus{ + StatusUpdatedAt: utils.PointerTo(time.Now()), + Role: utils.PointerTo(patroni.InstanceRolePrimary), + PostgresVersion: utils.PointerTo("17.5"), + SpockVersion: utils.PointerTo("6.0.0"), + }, + }, + { + InstanceID: "n2-host-2", + Status: &database.InstanceStatus{ + StatusUpdatedAt: utils.PointerTo(time.Now()), + Role: utils.PointerTo(patroni.InstanceRolePrimary), + PostgresVersion: utils.PointerTo("17.5"), + SpockVersion: utils.PointerTo("6.0.0"), + }, + }, + }, + expectedSpec: &database.StoredSpec{ + Spec: &database.Spec{ + PostgresVersion: "17.5", + SpockVersion: "6", + Nodes: []*database.Node{ + {Name: "n1", HostIDs: []string{"host-1"}}, + {Name: "n2", HostIDs: []string{"host-2"}}, + }, + }, + }, + expectedInstances: []*database.StoredInstance{ + { + InstanceID: "n1-host-1", + NodeName: "n1", + HostID: "host-1", + PgEdgeVersion: ds.MustParsePgEdgeVersion("17.5", "6"), + }, + { + InstanceID: "n2-host-2", + NodeName: "n2", + HostID: "host-2", + PgEdgeVersion: ds.MustParsePgEdgeVersion("17.5", "6"), + }, + }, + }, + { + name: "one node updated", + spec: &database.StoredSpec{ + Spec: &database.Spec{ + PostgresVersion: "17.4", + SpockVersion: "5", + Nodes: []*database.Node{ + {Name: "n1", HostIDs: []string{"host-1"}}, + {Name: "n2", HostIDs: []string{"host-2"}}, + }, + }, + }, + instances: []*database.StoredInstance{ + { + InstanceID: "n1-host-1", + NodeName: "n1", + HostID: "host-1", + PgEdgeVersion: ds.MustParsePgEdgeVersion("17.4", "5"), + }, + { + InstanceID: "n2-host-2", + NodeName: "n2", + HostID: "host-2", + PgEdgeVersion: ds.MustParsePgEdgeVersion("17.4", "5"), + }, + }, + statuses: []*database.StoredInstanceStatus{ + { + InstanceID: "n1-host-1", + Status: &database.InstanceStatus{ + StatusUpdatedAt: utils.PointerTo(time.Now()), + Role: utils.PointerTo(patroni.InstanceRolePrimary), + PostgresVersion: utils.PointerTo("17.4"), + SpockVersion: utils.PointerTo("5.0.6"), + }, + }, + { + InstanceID: "n2-host-2", + Status: &database.InstanceStatus{ + StatusUpdatedAt: utils.PointerTo(time.Now()), + Role: utils.PointerTo(patroni.InstanceRolePrimary), + PostgresVersion: utils.PointerTo("17.5"), + SpockVersion: utils.PointerTo("6.0.0"), + }, + }, + }, + expectedSpec: &database.StoredSpec{ + Spec: &database.Spec{ + PostgresVersion: "17.4", + SpockVersion: "5", + Nodes: []*database.Node{ + {Name: "n1", HostIDs: []string{"host-1"}}, + {Name: "n2", HostIDs: []string{"host-2"}, PostgresVersion: "17.5"}, + }, + }, + }, + expectedInstances: []*database.StoredInstance{ + { + InstanceID: "n2-host-2", + NodeName: "n2", + HostID: "host-2", + PgEdgeVersion: ds.MustParsePgEdgeVersion("17.5", "6"), + }, + }, + }, + { + name: "one node updated, one with stale status", + spec: &database.StoredSpec{ + Spec: &database.Spec{ + PostgresVersion: "17.4", + SpockVersion: "5", + Nodes: []*database.Node{ + {Name: "n1", HostIDs: []string{"host-1"}}, + {Name: "n2", HostIDs: []string{"host-2"}}, + }, + }, + }, + instances: []*database.StoredInstance{ + { + InstanceID: "n1-host-1", + NodeName: "n1", + HostID: "host-1", + PgEdgeVersion: ds.MustParsePgEdgeVersion("17.4", "5"), + }, + { + InstanceID: "n2-host-2", + NodeName: "n2", + HostID: "host-2", + PgEdgeVersion: ds.MustParsePgEdgeVersion("17.4", "5"), + }, + }, + statuses: []*database.StoredInstanceStatus{ + { + InstanceID: "n1-host-1", + Status: &database.InstanceStatus{ + StatusUpdatedAt: utils.PointerTo(time.Now().Add(-3 * database.InstanceMonitorRefreshInterval)), + Role: utils.PointerTo(patroni.InstanceRolePrimary), + PostgresVersion: utils.PointerTo("17.5"), + SpockVersion: utils.PointerTo("6.0.0"), + }, + }, + { + InstanceID: "n2-host-2", + Status: &database.InstanceStatus{ + StatusUpdatedAt: utils.PointerTo(time.Now()), + Role: utils.PointerTo(patroni.InstanceRolePrimary), + PostgresVersion: utils.PointerTo("17.5"), + SpockVersion: utils.PointerTo("6.0.0"), + }, + }, + }, + expectedSpec: &database.StoredSpec{ + Spec: &database.Spec{ + PostgresVersion: "17.4", + SpockVersion: "5", + Nodes: []*database.Node{ + {Name: "n1", HostIDs: []string{"host-1"}}, + {Name: "n2", HostIDs: []string{"host-2"}, PostgresVersion: "17.5"}, + }, + }, + }, + expectedInstances: []*database.StoredInstance{ + { + InstanceID: "n2-host-2", + NodeName: "n2", + HostID: "host-2", + PgEdgeVersion: ds.MustParsePgEdgeVersion("17.5", "6"), + }, + }, + }, + { + name: "not all instances updated", + spec: &database.StoredSpec{ + Spec: &database.Spec{ + PostgresVersion: "17.4", + SpockVersion: "5", + Nodes: []*database.Node{ + {Name: "n1", HostIDs: []string{"host-1", "host-2"}}, + }, + }, + }, + instances: []*database.StoredInstance{ + { + InstanceID: "n1-host-1", + NodeName: "n1", + HostID: "host-1", + PgEdgeVersion: ds.MustParsePgEdgeVersion("17.4", "5"), + }, + { + InstanceID: "n1-host-2", + NodeName: "n1", + HostID: "host-2", + PgEdgeVersion: ds.MustParsePgEdgeVersion("17.4", "5"), + }, + }, + statuses: []*database.StoredInstanceStatus{ + { + InstanceID: "n1-host-1", + Status: &database.InstanceStatus{ + StatusUpdatedAt: utils.PointerTo(time.Now()), + Role: utils.PointerTo(patroni.InstanceRoleReplica), + PostgresVersion: utils.PointerTo("17.4"), + SpockVersion: utils.PointerTo("5.0.6"), + }, + }, + { + InstanceID: "n1-host-2", + Status: &database.InstanceStatus{ + StatusUpdatedAt: utils.PointerTo(time.Now()), + Role: utils.PointerTo(patroni.InstanceRoleReplica), + PostgresVersion: utils.PointerTo("17.5"), + SpockVersion: utils.PointerTo("6.0.0"), + }, + }, + }, + expectedInstances: []*database.StoredInstance{ + { + InstanceID: "n1-host-2", + NodeName: "n1", + HostID: "host-2", + PgEdgeVersion: ds.MustParsePgEdgeVersion("17.5", "6"), + }, + }, + }, + { + name: "no instances", + spec: &database.StoredSpec{ + Spec: &database.Spec{ + PostgresVersion: "17.4", + SpockVersion: "5", + Nodes: []*database.Node{ + {Name: "n1"}, + }, + }, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + updatedSpec, updatedInstances := database.ReconcileVersions( + tc.spec, + tc.instances, + tc.statuses, + ) + assert.Equal(t, tc.expectedSpec, updatedSpec) + assert.Equal(t, tc.expectedInstances, updatedInstances) + }) + } +} diff --git a/server/internal/database/service.go b/server/internal/database/service.go index 118f4786..d1bc1590 100644 --- a/server/internal/database/service.go +++ b/server/internal/database/service.go @@ -9,10 +9,12 @@ import ( "time" "github.com/google/uuid" + "github.com/rs/zerolog" "github.com/pgEdge/control-plane/server/internal/config" "github.com/pgEdge/control-plane/server/internal/ds" "github.com/pgEdge/control-plane/server/internal/host" + "github.com/pgEdge/control-plane/server/internal/logging" "github.com/pgEdge/control-plane/server/internal/pgbackrest" "github.com/pgEdge/control-plane/server/internal/ports" "github.com/pgEdge/control-plane/server/internal/storage" @@ -36,6 +38,7 @@ type Service struct { store *Store hostSvc *host.Service portsSvc *ports.Service + logger zerolog.Logger } func NewService( @@ -44,6 +47,7 @@ func NewService( store *Store, hostSvc *host.Service, portsSvc *ports.Service, + loggerFactory *logging.Factory, ) *Service { return &Service{ cfg: cfg, @@ -51,6 +55,7 @@ func NewService( store: store, hostSvc: hostSvc, portsSvc: portsSvc, + logger: loggerFactory.Logger(logging.ComponentDatabaseService), } } diff --git a/server/internal/database/spec.go b/server/internal/database/spec.go index d3985b5e..eb921b87 100644 --- a/server/internal/database/spec.go +++ b/server/internal/database/spec.go @@ -473,6 +473,30 @@ func (s *Spec) RemoveHost(hostId string) (ok bool) { return ok } +// NormalizePostgresVersions checks if all nodes have an equal postgres version +// and, if so, sets the top-level postgres version to that version and sets each +// node's postgres version to an empty string. +func (s *Spec) NormalizePostgresVersions() { + var common string + for _, node := range s.Nodes { + nodeVersion := node.PostgresVersion + if nodeVersion == "" { + nodeVersion = s.PostgresVersion + } + if common == "" { + common = nodeVersion + } else if nodeVersion != common { + return + } + } + if common != "" { + s.PostgresVersion = common + for _, node := range s.Nodes { + node.PostgresVersion = "" + } + } +} + func (s Spec) defaultOptionalFieldFromNodes(other []*Node) { otherNodesByName := make(map[string]*Node) for _, n := range other { diff --git a/server/internal/logging/factory.go b/server/internal/logging/factory.go index 30e4c5aa..dbe9ede5 100644 --- a/server/internal/logging/factory.go +++ b/server/internal/logging/factory.go @@ -16,6 +16,7 @@ func (c Component) String() string { const ( ComponentAPIServer Component = "api_server" + ComponentDatabaseService Component = "database_service" ComponentElectionCandidate Component = "election_candidate" ComponentEmbeddedEtcd Component = "embedded_etcd" ComponentMigration Component = "migration" From c8e0eddbaa1fa5972537407f05f45946805cbcfc Mon Sep 17 00:00:00 2001 From: Jason Lynch Date: Mon, 4 May 2026 14:32:08 -0400 Subject: [PATCH 09/15] feat: add databases monitor Adds a 'databases' monitor to periodically run the database version reconciliation process. The monitor's interval is configurable and can be set to '0' to disable it entirely. This setting is useful in testing and in clusters with a large number of databases where the operator knows that the database versions will never change outside of the Control Plane API. PLAT-512 --- server/internal/config/config.go | 76 ++++++++++---------- server/internal/monitor/databases_monitor.go | 66 +++++++++++++++++ server/internal/monitor/provide.go | 14 +++- server/internal/monitor/service.go | 18 +++++ 4 files changed, 136 insertions(+), 38 deletions(-) create mode 100644 server/internal/monitor/databases_monitor.go diff --git a/server/internal/config/config.go b/server/internal/config/config.go index 55bdf9c0..c4059ebe 100644 --- a/server/internal/config/config.go +++ b/server/internal/config/config.go @@ -245,29 +245,30 @@ var defaultRandomPorts = RandomPorts{ } type Config struct { - TenantID string `koanf:"tenant_id" json:"tenant_id,omitempty"` - HostID string `koanf:"host_id" json:"host_id,omitempty"` - Orchestrator Orchestrator `koanf:"orchestrator" json:"orchestrator,omitempty"` - DataDir string `koanf:"data_dir" json:"data_dir,omitempty"` - PeerAddresses []string `koanf:"peer_addresses" json:"peer_addresses,omitempty"` - ClientAddresses []string `koanf:"client_addresses" json:"client_addresses,omitempty"` - StopGracePeriodSeconds int64 `koanf:"stop_grace_period_seconds" json:"stop_grace_period_seconds,omitempty"` - MQTT MQTT `koanf:"mqtt" json:"mqtt,omitzero"` - HTTP HTTP `koanf:"http" json:"http,omitzero"` - Logging Logging `koanf:"logging" json:"logging,omitzero"` - EtcdMode EtcdMode `koanf:"etcd_mode" json:"etcd_mode,omitempty"` - EtcdUsername string `koanf:"etcd_username" json:"etcd_username,omitempty"` - EtcdPassword string `koanf:"etcd_password" json:"etcd_password,omitempty"` - EtcdKeyRoot string `koanf:"etcd_key_root" json:"etcd_key_root,omitempty"` - EtcdServer EtcdServer `koanf:"etcd_server" json:"etcd_server,omitzero"` - EtcdClient EtcdClient `koanf:"etcd_client" json:"etcd_client,omitzero"` - TraefikEnabled bool `koanf:"traefik_enabled" json:"traefik_enabled,omitempty"` - VectorEnabled bool `koanf:"vector_enabled" json:"vector_enabled,omitempty"` - DockerSwarm DockerSwarm `koanf:"docker_swarm" json:"docker_swarm,omitzero"` - SystemD SystemD `koanf:"systemd" json:"systemd,omitzero"` - DatabaseOwnerUID int `koanf:"database_owner_uid" json:"database_owner_uid,omitempty"` - ProfilingEnabled bool `koanf:"profiling_enabled" json:"profiling_enabled,omitempty"` - RandomPorts RandomPorts `koanf:"random_ports" json:"random_ports,omitzero"` + TenantID string `koanf:"tenant_id" json:"tenant_id,omitempty"` + HostID string `koanf:"host_id" json:"host_id,omitempty"` + Orchestrator Orchestrator `koanf:"orchestrator" json:"orchestrator,omitempty"` + DataDir string `koanf:"data_dir" json:"data_dir,omitempty"` + PeerAddresses []string `koanf:"peer_addresses" json:"peer_addresses,omitempty"` + ClientAddresses []string `koanf:"client_addresses" json:"client_addresses,omitempty"` + StopGracePeriodSeconds int64 `koanf:"stop_grace_period_seconds" json:"stop_grace_period_seconds,omitempty"` + MQTT MQTT `koanf:"mqtt" json:"mqtt,omitzero"` + HTTP HTTP `koanf:"http" json:"http,omitzero"` + Logging Logging `koanf:"logging" json:"logging,omitzero"` + EtcdMode EtcdMode `koanf:"etcd_mode" json:"etcd_mode,omitempty"` + EtcdUsername string `koanf:"etcd_username" json:"etcd_username,omitempty"` + EtcdPassword string `koanf:"etcd_password" json:"etcd_password,omitempty"` + EtcdKeyRoot string `koanf:"etcd_key_root" json:"etcd_key_root,omitempty"` + EtcdServer EtcdServer `koanf:"etcd_server" json:"etcd_server,omitzero"` + EtcdClient EtcdClient `koanf:"etcd_client" json:"etcd_client,omitzero"` + TraefikEnabled bool `koanf:"traefik_enabled" json:"traefik_enabled,omitempty"` + VectorEnabled bool `koanf:"vector_enabled" json:"vector_enabled,omitempty"` + DockerSwarm DockerSwarm `koanf:"docker_swarm" json:"docker_swarm,omitzero"` + SystemD SystemD `koanf:"systemd" json:"systemd,omitzero"` + DatabaseOwnerUID int `koanf:"database_owner_uid" json:"database_owner_uid,omitempty"` + ProfilingEnabled bool `koanf:"profiling_enabled" json:"profiling_enabled,omitempty"` + RandomPorts RandomPorts `koanf:"random_ports" json:"random_ports,omitzero"` + DatabasesMonitorIntervalSeconds uint64 `koanf:"databases_monitor_interval_seconds" json:"databases_monitor_interval_seconds,omitempty"` } // ClientAddress is a convenience function to return the first client address. @@ -405,20 +406,21 @@ func DefaultConfig() (Config, error) { } return Config{ - HostID: hostID, - Orchestrator: OrchestratorSwarm, - EtcdMode: EtcdModeServer, - PeerAddresses: addresses, - ClientAddresses: addresses, - Logging: loggingDefault, - HTTP: httpDefault, - StopGracePeriodSeconds: 30, - EtcdServer: etcdServerDefault, - EtcdClient: etcdClientDefault, - DockerSwarm: defaultDockerSwarm, - SystemD: defaultSystemD, - DatabaseOwnerUID: 26, - RandomPorts: defaultRandomPorts, + HostID: hostID, + Orchestrator: OrchestratorSwarm, + EtcdMode: EtcdModeServer, + PeerAddresses: addresses, + ClientAddresses: addresses, + Logging: loggingDefault, + HTTP: httpDefault, + StopGracePeriodSeconds: 30, + EtcdServer: etcdServerDefault, + EtcdClient: etcdClientDefault, + DockerSwarm: defaultDockerSwarm, + SystemD: defaultSystemD, + DatabaseOwnerUID: 26, + RandomPorts: defaultRandomPorts, + DatabasesMonitorIntervalSeconds: 30, }, nil } diff --git a/server/internal/monitor/databases_monitor.go b/server/internal/monitor/databases_monitor.go new file mode 100644 index 00000000..c0a2babf --- /dev/null +++ b/server/internal/monitor/databases_monitor.go @@ -0,0 +1,66 @@ +package monitor + +import ( + "context" + "fmt" + "time" + + "github.com/rs/zerolog" + + "github.com/pgEdge/control-plane/server/internal/config" + "github.com/pgEdge/control-plane/server/internal/database" + "github.com/pgEdge/control-plane/server/internal/election" +) + +type DatabasesMonitor struct { + monitor *Monitor + svc *database.Service + candidate *election.Candidate +} + +func NewDatabasesMonitor( + logger zerolog.Logger, + svc *database.Service, + candidate *election.Candidate, + cfg config.Config, +) *DatabasesMonitor { + m := &DatabasesMonitor{ + svc: svc, + candidate: candidate, + } + interval := time.Duration(cfg.DatabasesMonitorIntervalSeconds) * time.Second + m.monitor = NewMonitor(logger, interval, m.update) + return m +} + +func (m *DatabasesMonitor) Start(ctx context.Context) error { + if err := m.candidate.Start(ctx); err != nil { + return fmt.Errorf("failed to start candidate: %w", err) + } + m.monitor.Start(ctx) + + return nil +} + +func (m *DatabasesMonitor) Stop() error { + ctx, cancel := context.WithTimeout(context.Background(), electionTTL/3) + defer cancel() + + m.monitor.Stop() + if err := m.candidate.Stop(ctx); err != nil { + return fmt.Errorf("failed to stop candidate: %w", err) + } + + return nil +} + +func (m *DatabasesMonitor) update(ctx context.Context) error { + if !m.candidate.IsLeader() { + return nil + } + if err := m.svc.ReconcileAllDatabaseVersions(ctx); err != nil { + return fmt.Errorf("failed to reconcile database versions: %w", err) + } + + return nil +} diff --git a/server/internal/monitor/provide.go b/server/internal/monitor/provide.go index 858274ff..7e197a39 100644 --- a/server/internal/monitor/provide.go +++ b/server/internal/monitor/provide.go @@ -1,6 +1,8 @@ package monitor import ( + "time" + "github.com/rs/zerolog" "github.com/samber/do" clientv3 "go.etcd.io/etcd/client/v3" @@ -8,9 +10,13 @@ import ( "github.com/pgEdge/control-plane/server/internal/certificates" "github.com/pgEdge/control-plane/server/internal/config" "github.com/pgEdge/control-plane/server/internal/database" + "github.com/pgEdge/control-plane/server/internal/election" "github.com/pgEdge/control-plane/server/internal/host" ) +const electionName election.Name = "databases-monitor" +const electionTTL time.Duration = 30 * time.Second + func Provide(i *do.Injector) { provideStore(i) provideService(i) @@ -46,7 +52,13 @@ func provideService(i *do.Injector) { if err != nil { return nil, err } - return NewService(cfg, logger, dbSvc, certSvc, dbOrch, store, hostSvc), nil + electionSvc, err := do.Invoke[*election.Service](i) + if err != nil { + return nil, err + } + + candidate := electionSvc.NewCandidate(electionName, cfg.HostID, electionTTL) + return NewService(cfg, logger, dbSvc, certSvc, dbOrch, store, hostSvc, candidate), nil }) } diff --git a/server/internal/monitor/service.go b/server/internal/monitor/service.go index cef1b0d1..b3d4d022 100644 --- a/server/internal/monitor/service.go +++ b/server/internal/monitor/service.go @@ -10,6 +10,7 @@ import ( "github.com/pgEdge/control-plane/server/internal/certificates" "github.com/pgEdge/control-plane/server/internal/config" "github.com/pgEdge/control-plane/server/internal/database" + "github.com/pgEdge/control-plane/server/internal/election" "github.com/pgEdge/control-plane/server/internal/host" ) @@ -26,6 +27,7 @@ type Service struct { hostMonitor *HostMonitor instances map[string]*InstanceMonitor serviceInstances map[string]*ServiceInstanceMonitor + databasesMonitor *DatabasesMonitor } func NewService( @@ -36,7 +38,12 @@ func NewService( dbOrch database.Orchestrator, store *Store, hostSvc *host.Service, + candidate *election.Candidate, ) *Service { + var databasesMonitor *DatabasesMonitor + if cfg.DatabasesMonitorIntervalSeconds > 0 { + databasesMonitor = NewDatabasesMonitor(logger, dbSvc, candidate, cfg) + } return &Service{ cfg: cfg, logger: logger, @@ -47,6 +54,7 @@ func NewService( instances: map[string]*InstanceMonitor{}, serviceInstances: map[string]*ServiceInstanceMonitor{}, hostMonitor: NewHostMonitor(logger, hostSvc), + databasesMonitor: databasesMonitor, } } @@ -57,6 +65,11 @@ func (s *Service) Start(ctx context.Context) error { // the lifetime of a single operation. s.appCtx = ctx s.hostMonitor.Start(ctx) + if s.databasesMonitor != nil { + if err := s.databasesMonitor.Start(ctx); err != nil { + return fmt.Errorf("failed to start databases monitor: %w", err) + } + } stored, err := s.store.InstanceMonitor. GetAllByHostID(s.cfg.HostID). @@ -106,6 +119,11 @@ func (s *Service) Shutdown() error { s.serviceInstances = map[string]*ServiceInstanceMonitor{} s.hostMonitor.Stop() + if s.databasesMonitor != nil { + if err := s.databasesMonitor.Stop(); err != nil { + return fmt.Errorf("failed to stop databases monitor: %w", err) + } + } return nil } From 2bcf2299d9d8a6a7c89f1e03ef1c799ef898bfec Mon Sep 17 00:00:00 2001 From: Jason Lynch Date: Mon, 4 May 2026 14:36:28 -0400 Subject: [PATCH 10/15] test: add cluster test for external db upgrades Adds a cluster test to verify the behavior of the new database version reconciliation process. PLAT-512 --- Makefile | 2 +- clustertest/external_upgrade_test.go | 292 +++++++++++++++++++++++++++ clustertest/host_test.go | 2 +- clustertest/utils_test.go | 60 +++--- 4 files changed, 325 insertions(+), 31 deletions(-) create mode 100644 clustertest/external_upgrade_test.go diff --git a/Makefile b/Makefile index a12c4a31..94a059e9 100644 --- a/Makefile +++ b/Makefile @@ -43,7 +43,7 @@ e2e_args=-tags=e2e_test -count=1 -timeout=45m \ $(if $(filter 1,$(E2E_DEBUG)),-debug) \ $(if $(E2E_DEBUG_DIR),-debug-dir $(E2E_DEBUG_DIR)) -cluster_test_args=-tags=cluster_test -count=1 -timeout=10m \ +cluster_test_args=-tags=cluster_test -count=1 -timeout=15m \ $(if $(CLUSTER_TEST_PARALLEL),-parallel $(CLUSTER_TEST_PARALLEL)) \ $(if $(CLUSTER_TEST_RUN),-run $(CLUSTER_TEST_RUN)) \ -args \ diff --git a/clustertest/external_upgrade_test.go b/clustertest/external_upgrade_test.go new file mode 100644 index 00000000..d857aa69 --- /dev/null +++ b/clustertest/external_upgrade_test.go @@ -0,0 +1,292 @@ +//go:build cluster_test + +package clustertest + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" + + controlplane "github.com/pgEdge/control-plane/api/apiv1/gen/control_plane" + "github.com/pgEdge/control-plane/client" +) + +func TestExternalUpgrade(t *testing.T) { + // Tests that the control plane updates its records when the user upgrades + // a database outside of our API. + t.Parallel() + ctx := t.Context() + + const ( + startPostgresVersion string = "18.2" + upgradePostgresVersion string = "18.3" + upgradeImage string = "ghcr.io/pgedge/pgedge-postgres:18.3-spock5.0.6-standard-1" + spockVersion string = "5" + sleepDuration time.Duration = 5 * time.Second + ) + + // Helper functions + assertSpecVersions := func(t *testing.T, spec *controlplane.DatabaseSpec, expectedSpecVersion string, expectedNodeVersions map[string]string) { + t.Helper() + + actualNodeVersions := make(map[string]string, len(spec.Nodes)) + for _, node := range spec.Nodes { + var version string + if node.PostgresVersion != nil { + version = *node.PostgresVersion + } + actualNodeVersions[node.Name] = version + } + var actualSpecVersion string + if spec.PostgresVersion != nil { + actualSpecVersion = *spec.PostgresVersion + } + require.Equal(t, expectedSpecVersion, actualSpecVersion) + require.Equal(t, expectedNodeVersions, actualNodeVersions) + } + assertInstanceVersions := func(t *testing.T, instances []*controlplane.Instance, expectedNodeHostVersions map[string]map[string]string) { + t.Helper() + + actualNodeHostVersions := map[string]map[string]string{} + for _, instance := range instances { + require.Equal(t, client.InstanceStateAvailable, instance.State) + + if _, ok := actualNodeHostVersions[instance.NodeName]; !ok { + actualNodeHostVersions[instance.NodeName] = map[string]string{} + } + var version string + if instance.Postgres.Version != nil { + version = *instance.Postgres.Version + } + actualNodeHostVersions[instance.NodeName][instance.HostID] = version + } + require.Equal(t, expectedNodeHostVersions, actualNodeHostVersions) + } + upgradeService := func(t *testing.T, databaseID, nodeName, hostID string) { + t.Helper() + + tLogf(t, "upgrading %s %s instance", nodeName, hostID) + + ctx, cancel := context.WithTimeout(t.Context(), 30*time.Second) + defer cancel() + + serviceName := dockerCmd(t, ctx, + "service", + "ls", + fmt.Sprintf("--filter=label=pgedge.database.id=%s", databaseID), + fmt.Sprintf("--filter=label=pgedge.node.name=%s", nodeName), + fmt.Sprintf("--filter=label=pgedge.host.id=%s", hostID), + "--format={{.Name}}", + ) + require.NotEmpty(t, serviceName) + dockerCmd(t, ctx, + "service", + "update", + fmt.Sprintf("--image=%s", upgradeImage), + // disabling healthchecks to speed up startup time + "--no-healthcheck", + serviceName, + ) + } + + env := map[string]string{ + "PGEDGE_DATABASES_MONITOR_INTERVAL_SECONDS": "3", + } + cluster := NewCluster(t, ClusterConfig{ + Hosts: []HostConfig{ + {ID: "host-1", ExtraEnv: env}, + {ID: "host-2", ExtraEnv: env}, + {ID: "host-3", ExtraEnv: env}, + }, + }) + cluster.Init(t) + + spec := &controlplane.DatabaseSpec{ + DatabaseName: "test_upgrade", + PostgresVersion: pointerTo(startPostgresVersion), + SpockVersion: pointerTo(spockVersion), + Nodes: []*controlplane.DatabaseNodeSpec{ + { + Name: "n1", + HostIds: []controlplane.Identifier{"host-1", "host-2"}, + }, + { + Name: "n2", + HostIds: []controlplane.Identifier{"host-3"}, + }, + }, + } + + tLog(t, "creating database") + + createResp, err := cluster.Client().CreateDatabase(ctx, &controlplane.CreateDatabaseRequest{ + Spec: spec, + }) + require.NoError(t, err) + + databaseID := createResp.Database.ID + + t.Cleanup(func() { + // Use a new context for cleanup operations since t.Context is canceled. + ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second) + defer cancel() + + if testConfig.skipCleanup { + tLogf(t, "skipping cleanup for database '%s'", databaseID) + return + } + + tLogf(t, "cleaning up database '%s'", databaseID) + + resp, err := cluster.Client().DeleteDatabase(ctx, &controlplane.DeleteDatabasePayload{ + DatabaseID: databaseID, + }) + if err != nil { + tLogf(t, "failed to delete database '%s': %v", databaseID, err) + return + } + + tLog(t, "waiting for database deletion to complete") + + err = waitForTaskComplete(ctx, cluster.Client(), databaseID, resp.Task.TaskID, time.Minute) + if err != nil { + tLogf(t, "failed while waiting for database deletion '%s'", databaseID) + return + } + }) + + tLog(t, "waiting for database creation to complete") + + err = waitForTaskComplete(ctx, cluster.Client(), databaseID, createResp.Task.TaskID, 3*time.Minute) + require.NoError(t, err) + + tLog(t, "sleeping to allow instance monitor interval to complete") + + time.Sleep(sleepDuration) + + tLogf(t, "asserting that all instances and spec versions are %s", startPostgresVersion) + + db, err := cluster.Client().GetDatabase(ctx, &controlplane.GetDatabasePayload{ + DatabaseID: databaseID, + }) + require.NoError(t, err) + + assertSpecVersions(t, db.Spec, startPostgresVersion, map[string]string{ + "n1": "", + "n2": "", + }) + assertInstanceVersions(t, db.Instances, map[string]map[string]string{ + "n1": map[string]string{ + "host-1": startPostgresVersion, + "host-2": startPostgresVersion, + }, + "n2": map[string]string{ + "host-3": startPostgresVersion, + }, + }) + + tLog(t, "getting database docker service names") + + upgradeService(t, string(databaseID), "n1", "host-2") + upgradeService(t, string(databaseID), "n2", "host-3") + + tLog(t, "sleeping to allow instance monitor interval and version reconciliation to complete") + + time.Sleep(sleepDuration) + + tLogf(t, "asserting that n2 is %s in the spec and that the n1-host-2 and n2-host-3 instances are %s", upgradePostgresVersion, upgradePostgresVersion) + + db, err = cluster.Client().GetDatabase(ctx, &controlplane.GetDatabasePayload{ + DatabaseID: databaseID, + }) + require.NoError(t, err) + + assertSpecVersions(t, db.Spec, startPostgresVersion, map[string]string{ + "n1": "", + "n2": upgradePostgresVersion, + }) + assertInstanceVersions(t, db.Instances, map[string]map[string]string{ + "n1": map[string]string{ + "host-1": startPostgresVersion, + "host-2": upgradePostgresVersion, + }, + "n2": map[string]string{ + "host-3": upgradePostgresVersion, + }, + }) + + upgradeService(t, string(databaseID), "n1", "host-1") + + tLog(t, "sleeping to allow monitor interval and version reconciliation to complete") + + time.Sleep(sleepDuration) + + tLogf(t, "asserting the top-level version is %s and that all instances are %s", upgradePostgresVersion, upgradePostgresVersion) + + db, err = cluster.Client().GetDatabase(ctx, &controlplane.GetDatabasePayload{ + DatabaseID: databaseID, + }) + require.NoError(t, err) + + assertSpecVersions(t, db.Spec, upgradePostgresVersion, map[string]string{ + "n1": "", + "n2": "", + }) + assertInstanceVersions(t, db.Instances, map[string]map[string]string{ + "n1": map[string]string{ + "host-1": upgradePostgresVersion, + "host-2": upgradePostgresVersion, + }, + "n2": map[string]string{ + "host-3": upgradePostgresVersion, + }, + }) + + tLog(t, "performing a no-op update") + + // We still expect to see some resource updates in the logs because the + // version number shows up in a few resources states. This does trigger a + // patroni reload in Swarm databases, which eats up time, but no actual + // changes should occur. + + updateResp, err := cluster.Client().UpdateDatabase(ctx, &controlplane.UpdateDatabasePayload{ + DatabaseID: databaseID, + Request: &controlplane.UpdateDatabaseRequest{ + Spec: db.Spec, + }, + }) + require.NoError(t, err) + + tLog(t, "waiting for database update to complete") + + err = waitForTaskComplete(ctx, cluster.Client(), databaseID, updateResp.Task.TaskID, 3*time.Minute) + require.NoError(t, err) + + tLog(t, "sleeping to allow instance monitor interval to complete") + + time.Sleep(sleepDuration) + + tLog(t, "asserting that top-level versions have not changed") + + db, err = cluster.Client().GetDatabase(ctx, &controlplane.GetDatabasePayload{ + DatabaseID: databaseID, + }) + require.NoError(t, err) + + assertSpecVersions(t, db.Spec, upgradePostgresVersion, map[string]string{ + "n1": "", + "n2": "", + }) + assertInstanceVersions(t, db.Instances, map[string]map[string]string{ + "n1": map[string]string{ + "host-1": upgradePostgresVersion, + "host-2": upgradePostgresVersion, + }, + "n2": map[string]string{ + "host-3": upgradePostgresVersion, + }, + }) +} diff --git a/clustertest/host_test.go b/clustertest/host_test.go index 375cfdca..75af6f05 100644 --- a/clustertest/host_test.go +++ b/clustertest/host_test.go @@ -294,7 +294,7 @@ func printContainerLogs(ctx context.Context, t testing.TB, hostID string, contai tLog(t, "container is nil") return } - logs, err := containerLogs(t.Context(), t, container) + logs, err := containerLogs(ctx, t, container) if err != nil { tLogf(t, "failed to extract container logs: %s", err) } else { diff --git a/clustertest/utils_test.go b/clustertest/utils_test.go index 14ae94cb..f9efeda5 100644 --- a/clustertest/utils_test.go +++ b/clustertest/utils_test.go @@ -12,6 +12,7 @@ import ( "os/exec" "path/filepath" "runtime" + "strings" "sync" "testing" "time" @@ -159,42 +160,43 @@ func pointerTo[T any](v T) *T { return &v } +func dockerCmd(t testing.TB, ctx context.Context, args ...string) string { + t.Helper() + + tLogf(t, "executing command: docker %s", strings.Join(args, " ")) + + var w strings.Builder + cmd := exec.CommandContext(ctx, "docker", args...) + cmd.Stdout = &w + cmd.Stderr = &w + err := cmd.Run() + out := w.String() + require.NoError(t, err, "docker command failed: %s", out) + + return strings.TrimSpace(out) +} + // waitForTaskComplete polls a database task until it completes, fails, or times out. func waitForTaskComplete(ctx context.Context, c client.Client, dbID api.Identifier, taskID string, timeout time.Duration) error { ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() - ticker := time.NewTicker(2 * time.Second) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return fmt.Errorf("timeout waiting for task %s to complete", taskID) - case <-ticker.C: - task, err := c.GetDatabaseTask(ctx, &api.GetDatabaseTaskPayload{ - DatabaseID: dbID, - TaskID: taskID, - }) - if err != nil { - return fmt.Errorf("failed to get task: %w", err) - } - - switch task.Status { - case client.TaskStatusCompleted: - return nil - case client.TaskStatusFailed: - errMsg := "unknown error" - if task.Error != nil { - errMsg = *task.Error - } - return fmt.Errorf("task failed: %s", errMsg) - case client.TaskStatusCanceled: - return fmt.Errorf("task was canceled") - // "pending", "running", "canceling" - continue waiting - } + task, err := c.WaitForDatabaseTask(ctx, &api.GetDatabaseTaskPayload{ + DatabaseID: dbID, + TaskID: taskID, + }) + if err != nil { + return fmt.Errorf("failed to wait for task: %w", err) + } + if task.Status != client.TaskStatusCompleted { + var taskError string + if task.Error != nil { + taskError = *task.Error } + return fmt.Errorf("task status is '%s' instead of 'completed', error=%s", task.Status, taskError) } + + return nil } // waitForDatabaseAvailable polls a database until it reaches available state or times out. From 4e0772185078fb08eca6cccdb181a7a60c0cc27e Mon Sep 17 00:00:00 2001 From: Jason Lynch Date: Mon, 4 May 2026 14:37:18 -0400 Subject: [PATCH 11/15] chore: add 'old' pgedge repos to dev-lima pgEdge only makes the latest package versions available in the `release` repositories. Older versions of packages are only available in the `release_old` repositories. This commit adds the `release_old` repositories to the `dev-lima` environment so that we can install older versions of Postgres and other components in this environment. PLAT-512 --- .../install_prerequisites/tasks/main.yaml | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/lima/roles/install_prerequisites/tasks/main.yaml b/lima/roles/install_prerequisites/tasks/main.yaml index 5dfe7549..c6881ee2 100644 --- a/lima/roles/install_prerequisites/tasks/main.yaml +++ b/lima/roles/install_prerequisites/tasks/main.yaml @@ -15,6 +15,24 @@ name: https://dnf.pgedge.com/reporpm/pgedge-release-latest.noarch.rpm state: present disable_gpg_check: true +- name: Add pgEdge old repository + ansible.builtin.yum_repository: + name: pgedge-old + description: pgEdge Old RPM Repository + baseurl: "https://dnf.pgedge.com/release_old/{{ansible_facts['distribution_major_version']}}/RPMS/{{ansible_facts['architecture']}}/" + # We only host one version of our GPG keys and its invalid for these old + # packages. + gpgcheck: no + enabled: yes + sslverify: yes +- name: Add pgEdge noarch old repository + ansible.builtin.yum_repository: + name: pgedge-noarch-old + description: pgEdge Old RPM Repository (noarch) + baseurl: "https://dnf.pgedge.com/release_old/{{ansible_facts['distribution_major_version']}}/RPMS/noarch/" + gpgcheck: no + enabled: yes + sslverify: yes - name: Install prerequisites ansible.builtin.package: name: '{{ item }}' @@ -75,5 +93,8 @@ when: chronycfg.changed - name: Install delve debugger ansible.builtin.command: /usr/local/go/bin/go install github.com/go-delve/delve/cmd/dlv@latest + args: + creates: /root/go/bin/dlv - name: Fix clocks ansible.builtin.command: chronyc -a makestep + changed_when: false From 7f97991ec26ec2b8e6a458c52ef96842f3caa8ab Mon Sep 17 00:00:00 2001 From: Jason Lynch Date: Mon, 4 May 2026 14:39:39 -0400 Subject: [PATCH 12/15] docs: update configuration doc Adds the new configuration setting and logging component to the configuration guide. PLAT-512 --- docs/installation/configuration.md | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/docs/installation/configuration.md b/docs/installation/configuration.md index 6471066b..8bd75cf5 100644 --- a/docs/installation/configuration.md +++ b/docs/installation/configuration.md @@ -16,11 +16,6 @@ Use a comma-separated string to specify string array properties, such as PGEDGE_CLIENT_ADDRESSES='192.168.1.2,my-host.internal' ``` -- [Configuration Reference](#configuration-reference) - - [Required Settings](#required-settings) - - [Optional Settings](#optional-settings) - - [Components](#components) - ## Required Settings | Property | Environment variable | Type | Description | Constraints | @@ -55,12 +50,14 @@ PGEDGE_CLIENT_ADDRESSES='192.168.1.2,my-host.internal' | `docker_swarm.database_networks_cidr` | `PGEDGE_DOCKER_SWARM__DATABASE_NETWORKS_CIDR` | string | `10.128.128.0/18` | The CIDR used to allocate per-database networks. | Must not be changed after creating databases. | | `docker_swarm.database_networks_subnet_bits` | `PGEDGE_DOCKER_SWARM__DATABASE_NETWORKS_SUBNET_BITS` | int | `26` | The subnet size for per-database networks. | Must not be changed after creating databases. | | `database_owner_uid` | `PGEDGE_DATABASE_OWNER_UID` | int | `26` | The UID to use for database configuration and data. | Must match the UID that owns the Postgres server processes. | +| `databases_monitor_interval_seconds` | `PGEDGE_DATABASES_MONITOR_INTERVAL_SECONDS` | uint | `30` | The refresh interval for the 'databases' monitor. This monitor watches for database version changes that happen outside of the Control Plane API, such as through a system package update. | Set to `0` to disable this monitor. | ### Components This is the current list of components that can be configured in the `logging.component_levels` setting: - `api_server` +- `database_service` - `election_candidate` - `embedded_etcd` - `migration` From fead535ccf05a77ae4add3c745818cd604b9fa44 Mon Sep 17 00:00:00 2001 From: Jason Lynch Date: Mon, 4 May 2026 14:58:28 -0400 Subject: [PATCH 13/15] docs: minor postgres version upgrades for systemd Updates the systemd document to describe the Postgres minor version upgrade process. PLAT-512 --- docs/installation/systemd.md | 36 ++++++++++++++++++++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-) diff --git a/docs/installation/systemd.md b/docs/installation/systemd.md index 95dd1604..c6cfdbdb 100644 --- a/docs/installation/systemd.md +++ b/docs/installation/systemd.md @@ -25,8 +25,9 @@ manage Postgres instances rather than Docker containers. The systemd installation method has the following known limitations in the current release. -- The Postgres version of a database must not be changed after the database is - created; support for package upgrades is coming in a subsequent release. +- Database upgrades are not supported via the API. Minor version upgrades can be + performed manually by a system administrator. Further support for package + management and database upgrades will be added in subsequent releases. - Supporting Services are not yet supported on systemd clusters; support is coming in a subsequent release. - All hosts in a cluster must use the same orchestrator (either `swarm` or @@ -236,6 +237,37 @@ instructions. > systemd clusters. As with other port fields, you can specify `0` to > assign a random port. +## Performing Postgres Minor Version Upgrades + +Database upgrades are not yet supported via the Control Plane API, but system +administrators can perform minor Postgres version upgrades by updating the +packages on each machine. Follow these steps on each host in the cluster: + +1. Upgrade Postgres and/or other components using `dnf upgrade`. For example: + + ```sh + sudo dnf upgrade pgedge-postgresql18 + ``` + +2. Find the systemd unit names for your database instances by listing units that + have the `patroni-*` prefix: + + ```sh + sudo systemctl list-units 'patroni-*' + ``` + +3. Restart each service: + + ```sh + sudo systemctl try-restart + ``` + +To minimize the risk of downtime, we recommend upgrading one host at a time, +starting with hosts running replica instances. + +After completing the upgrade on all hosts, it may take up to 30 seconds for the +new versions to be reflected in the database spec in the Control Plane API. + ## Updating the Control Plane Updating the Control Plane requires stopping the service, installing the new From 8c65137205884c9bcf5d6960e70414675f25b854 Mon Sep 17 00:00:00 2001 From: Jason Lynch Date: Mon, 4 May 2026 15:04:48 -0400 Subject: [PATCH 14/15] docs: changelog entries Adds changelog entries for db version reconciliation and the instance monitor change. PLAT-512 --- changes/unreleased/Added-20260504-150235.yaml | 3 +++ changes/unreleased/Changed-20260504-152348.yaml | 3 +++ 2 files changed, 6 insertions(+) create mode 100644 changes/unreleased/Added-20260504-150235.yaml create mode 100644 changes/unreleased/Changed-20260504-152348.yaml diff --git a/changes/unreleased/Added-20260504-150235.yaml b/changes/unreleased/Added-20260504-150235.yaml new file mode 100644 index 00000000..05fa0862 --- /dev/null +++ b/changes/unreleased/Added-20260504-150235.yaml @@ -0,0 +1,3 @@ +kind: Added +body: Added a feature to enable manual Postgres minor version updates in systemd clusters. The Control Plane will now update its copy of the database spec when it detects changes to an instance's Postgres or Spock version. +time: 2026-05-04T15:02:35.045407-04:00 diff --git a/changes/unreleased/Changed-20260504-152348.yaml b/changes/unreleased/Changed-20260504-152348.yaml new file mode 100644 index 00000000..fa9dc8bb --- /dev/null +++ b/changes/unreleased/Changed-20260504-152348.yaml @@ -0,0 +1,3 @@ +kind: Changed +body: Changed the instance monitoring system to query the Postgres and Spock versions for replica instances and report them in the databases API. +time: 2026-05-04T15:23:48.324604-04:00 From 2a26c91566a4fdf7ee6454aed4516c5b70abca8e Mon Sep 17 00:00:00 2001 From: Jason Lynch Date: Wed, 13 May 2026 09:41:00 -0400 Subject: [PATCH 15/15] fix: reconcile versions improvements Makes two minor improvements to the database version reconciliation code: - Skip instance records with nil `PgEdgeVersion` - I've added a new test case to test this scenario - Rearrange `reconcileNodeVersions` logic to remove redundant condition and to only normalize the postgres versions when the postgres versions have changed. - I updated the existing spock-only update test case to ensure that the postgres versions are not normalized. PLAT-512 --- .../internal/database/reconcile_versions.go | 12 +-- .../database/reconcile_versions_test.go | 86 ++++++++++++++++--- 2 files changed, 82 insertions(+), 16 deletions(-) diff --git a/server/internal/database/reconcile_versions.go b/server/internal/database/reconcile_versions.go index b27dccf7..a07cf425 100644 --- a/server/internal/database/reconcile_versions.go +++ b/server/internal/database/reconcile_versions.go @@ -123,6 +123,9 @@ func reconcileInstanceVersions( } instancesByNodeHost := make(map[nodeHostKey]*StoredInstance, len(instances)) for _, instance := range instances { + if instance.PgEdgeVersion == nil { + continue + } status, ok := statusesByID[instance.InstanceID] if !ok || status.Status.IsStale() { continue @@ -199,16 +202,13 @@ func reconcileNodeVersions( spockMatches = false } } + if updatedSpec != nil { + updatedSpec.NormalizePostgresVersions() + } if spockMatches && commonSpockVersion != "" && commonSpockVersion != spec.SpockVersion { spec.SpockVersion = commonSpockVersion updatedSpec = spec } - if updatedSpec != nil { - updatedSpec.NormalizePostgresVersions() - if spockMatches && commonSpockVersion != "" { - updatedSpec.SpockVersion = commonSpockVersion - } - } return updatedSpec } diff --git a/server/internal/database/reconcile_versions_test.go b/server/internal/database/reconcile_versions_test.go index bdd7e700..92ffb774 100644 --- a/server/internal/database/reconcile_versions_test.go +++ b/server/internal/database/reconcile_versions_test.go @@ -129,8 +129,16 @@ func TestReconcileVersions(t *testing.T) { PostgresVersion: "17.4", SpockVersion: "5", Nodes: []*database.Node{ - {Name: "n1", HostIDs: []string{"host-1"}}, - {Name: "n2", HostIDs: []string{"host-2"}}, + { + Name: "n1", + HostIDs: []string{"host-1"}, + PostgresVersion: "17.5", + }, + { + Name: "n2", + HostIDs: []string{"host-2"}, + PostgresVersion: "17.5", + }, }, }, }, @@ -139,13 +147,13 @@ func TestReconcileVersions(t *testing.T) { InstanceID: "n1-host-1", NodeName: "n1", HostID: "host-1", - PgEdgeVersion: ds.MustParsePgEdgeVersion("17.4", "5"), + PgEdgeVersion: ds.MustParsePgEdgeVersion("17.5", "5"), }, { InstanceID: "n2-host-2", NodeName: "n2", HostID: "host-2", - PgEdgeVersion: ds.MustParsePgEdgeVersion("17.4", "5"), + PgEdgeVersion: ds.MustParsePgEdgeVersion("17.5", "5"), }, }, statuses: []*database.StoredInstanceStatus{ @@ -154,7 +162,7 @@ func TestReconcileVersions(t *testing.T) { Status: &database.InstanceStatus{ StatusUpdatedAt: utils.PointerTo(time.Now()), Role: utils.PointerTo(patroni.InstanceRolePrimary), - PostgresVersion: utils.PointerTo("17.4"), + PostgresVersion: utils.PointerTo("17.5"), SpockVersion: utils.PointerTo("6.0.0"), }, }, @@ -163,7 +171,7 @@ func TestReconcileVersions(t *testing.T) { Status: &database.InstanceStatus{ StatusUpdatedAt: utils.PointerTo(time.Now()), Role: utils.PointerTo(patroni.InstanceRolePrimary), - PostgresVersion: utils.PointerTo("17.4"), + PostgresVersion: utils.PointerTo("17.5"), SpockVersion: utils.PointerTo("6.0.0"), }, }, @@ -173,8 +181,16 @@ func TestReconcileVersions(t *testing.T) { PostgresVersion: "17.4", SpockVersion: "6", Nodes: []*database.Node{ - {Name: "n1", HostIDs: []string{"host-1"}}, - {Name: "n2", HostIDs: []string{"host-2"}}, + { + Name: "n1", + HostIDs: []string{"host-1"}, + PostgresVersion: "17.5", // These overrides should remain unnormalized since only the spock version changed + }, + { + Name: "n2", + HostIDs: []string{"host-2"}, + PostgresVersion: "17.5", + }, }, }, }, @@ -183,13 +199,13 @@ func TestReconcileVersions(t *testing.T) { InstanceID: "n1-host-1", NodeName: "n1", HostID: "host-1", - PgEdgeVersion: ds.MustParsePgEdgeVersion("17.4", "6"), + PgEdgeVersion: ds.MustParsePgEdgeVersion("17.5", "6"), }, { InstanceID: "n2-host-2", NodeName: "n2", HostID: "host-2", - PgEdgeVersion: ds.MustParsePgEdgeVersion("17.4", "6"), + PgEdgeVersion: ds.MustParsePgEdgeVersion("17.5", "6"), }, }, }, @@ -460,6 +476,56 @@ func TestReconcileVersions(t *testing.T) { }, }, }, + { + name: "malformed instance records", + spec: &database.StoredSpec{ + Spec: &database.Spec{ + PostgresVersion: "17.4", + SpockVersion: "5", + Nodes: []*database.Node{ + {Name: "n1", HostIDs: []string{"host-1"}}, + {Name: "n2", HostIDs: []string{"host-2"}}, + }, + }, + }, + // These instances are missing a PgEdgeVersion due to a failure + // somewhere else in the system. + instances: []*database.StoredInstance{ + { + InstanceID: "n1-host-1", + NodeName: "n1", + HostID: "host-1", + }, + { + InstanceID: "n2-host-2", + NodeName: "n2", + HostID: "host-2", + }, + }, + // These instances are up and running even though the instance + // records are malformed. Otherwise, reconcileVersions will skip + // these instances. + statuses: []*database.StoredInstanceStatus{ + { + InstanceID: "n1-host-1", + Status: &database.InstanceStatus{ + StatusUpdatedAt: utils.PointerTo(time.Now()), + Role: utils.PointerTo(patroni.InstanceRoleReplica), + PostgresVersion: utils.PointerTo("17.4"), + SpockVersion: utils.PointerTo("5.0.6"), + }, + }, + { + InstanceID: "n2-host-2", + Status: &database.InstanceStatus{ + StatusUpdatedAt: utils.PointerTo(time.Now()), + Role: utils.PointerTo(patroni.InstanceRoleReplica), + PostgresVersion: utils.PointerTo("17.4"), + SpockVersion: utils.PointerTo("5.0.6"), + }, + }, + }, + }, } { t.Run(tc.name, func(t *testing.T) { updatedSpec, updatedInstances := database.ReconcileVersions(