diff --git a/base/audit_events.go b/base/audit_events.go index bde05160ca..f16bd8f7ff 100644 --- a/base/audit_events.go +++ b/base/audit_events.go @@ -55,8 +55,11 @@ const ( AuditIDSyncGatewayProfiling AuditID = 53304 // SG cluster events - AuditIDClusterInfoRead AuditID = 53350 - AuditIDPostUpgrade AuditID = 54043 + AuditIDClusterInfoRead AuditID = 53350 + AuditIDClusterCompatVersionRead AuditID = 53351 + AuditIDClusterCompatVersionFreeze AuditID = 53352 + AuditIDClusterCompatVersionUnfreeze AuditID = 53353 + AuditIDPostUpgrade AuditID = 54043 // Database events AuditIDCreateDatabase AuditID = 54000 @@ -464,6 +467,38 @@ var AuditEvents = events{ EventType: eventTypeAdmin, IsGlobalEvent: true, }, + AuditIDClusterCompatVersionRead: { + Name: "Sync Gateway cluster compatibility version read", + Description: "Sync Gateway cluster compatibility version was viewed", + EnabledByDefault: true, + FilteringPermitted: true, + EventType: eventTypeAdmin, + IsGlobalEvent: true, + }, + AuditIDClusterCompatVersionFreeze: { + Name: "Sync Gateway cluster compatibility version frozen", + Description: "An admin pinned the Sync Gateway cluster compatibility version to its current value", + MandatoryFields: AuditFields{ + AuditFieldClusterCompatVersion: "4.1", + AuditFieldFrozenAt: "2026-01-02T15:04:05Z", + }, + EnabledByDefault: true, + FilteringPermitted: true, + EventType: eventTypeAdmin, + IsGlobalEvent: true, + }, + AuditIDClusterCompatVersionUnfreeze: { + Name: "Sync Gateway cluster compatibility version unfrozen", + Description: "An admin cleared the Sync Gateway cluster compatibility version freeze. The cluster_compat_version and frozen_at fields describe the freeze record that was lifted (when it was originally set), not the time of the unfreeze action.", + OptionalFields: AuditFields{ + AuditFieldClusterCompatVersion: "4.1", + AuditFieldFrozenAt: "2026-01-02T15:04:05Z", + }, + EnabledByDefault: true, + FilteringPermitted: true, + EventType: eventTypeAdmin, + IsGlobalEvent: true, + }, AuditIDCreateDatabase: { Name: "Create database", Description: "A new database was created", diff --git a/base/audit_events_fields.go b/base/audit_events_fields.go index 9b3ea55568..efea3cad48 100644 --- a/base/audit_events_fields.go +++ b/base/audit_events_fields.go @@ -86,4 +86,8 @@ const ( AuditFieldDocIDs = "doc_ids" AuditFieldFeedType = "feed_type" AuditFieldIncludeDocs = "include_docs" + + // Cluster compat version freeze events 53352, 53353 + AuditFieldClusterCompatVersion = "cluster_compat_version" + AuditFieldFrozenAt = "frozen_at" ) diff --git a/base/version_cluster_compat.go b/base/version_cluster_compat.go index 2a1e2ba669..6dc8d4f169 100644 --- a/base/version_cluster_compat.go +++ b/base/version_cluster_compat.go @@ -101,6 +101,14 @@ type RegistryNode struct { Databases map[string]string `json:"databases,omitempty"` } +// RegistryFreeze records an admin-issued cluster compatibility version freeze stored in a +// bucket registry. When present, the cluster compatibility version reported by Sync Gateway is +// pinned to Version, preventing it from advancing as nodes are upgraded. +type RegistryFreeze struct { + Version ClusterCompatVersion `json:"version"` + FrozenAt time.Time `json:"frozen_at"` +} + // ParseClusterCompatVersion parses a "major.minor" string into a ClusterCompatVersion. func ParseClusterCompatVersion(s string) (ClusterCompatVersion, error) { parts := strings.SplitN(s, ".", 2) diff --git a/docs/api/admin.yaml b/docs/api/admin.yaml index eb5a58d3e2..adcedeea4c 100644 --- a/docs/api/admin.yaml +++ b/docs/api/admin.yaml @@ -79,6 +79,12 @@ paths: $ref: ./paths/admin/_config.yaml /_cluster_info: $ref: ./paths/admin/_cluster_info.yaml + /_cluster_compat_version: + $ref: ./paths/admin/_cluster_compat_version.yaml + /_cluster_compat_version/freeze: + $ref: ./paths/admin/_cluster_compat_version-freeze.yaml + /_cluster_compat_version/unfreeze: + $ref: ./paths/admin/_cluster_compat_version-unfreeze.yaml /_status: $ref: ./paths/admin/_status.yaml /_sgcollect_info: diff --git a/docs/api/components/schemas.yaml b/docs/api/components/schemas.yaml index 66f589da95..494652bc10 100644 --- a/docs/api/components/schemas.yaml +++ b/docs/api/components/schemas.yaml @@ -3356,6 +3356,44 @@ ClusterCompatVersion: nullable: true example: '4.1' title: Cluster Compatibility Version +ClusterCompatVersionState: + type: object + description: |- + Cluster compatibility version state. + + Without a freeze, `cluster_compat_version` is the minimum version across all live nodes + in the cluster, and advances automatically as nodes are upgraded. + + A freeze pins `cluster_compat_version` to the value captured at the time of the freeze, + preventing it from advancing as nodes upgrade. This preserves the option to roll a node + back to the prior major/minor before committing to the upgrade. + + The presence of `frozen_cluster_compat_version` indicates that a frozen version is currently set; + its absence indicates no version has been frozen. + properties: + cluster_compat_version: + allOf: + - $ref: '#/ClusterCompatVersion' + description: |- + The currently-reported cluster compatibility version. Equal to + `frozen_cluster_compat_version` when a freeze is set, otherwise the minimum across + all live nodes. Omitted if no version has been computed yet (e.g. immediately after + startup, before any node has registered). + nodes: + type: object + description: |- + Per-node cluster compatibility versions, keyed by node UID. Includes every node + currently registered in any tracked bucket registry. + additionalProperties: + x-additionalPropertiesName: nodeUID + $ref: '#/ClusterCompatVersion' + frozen_cluster_compat_version: + allOf: + - $ref: '#/ClusterCompatVersion' + description: |- + The cluster compatibility version captured at the time of the most recent freeze. + Omitted when no freeze is set. + title: Cluster Compatibility Version State DiagnosticFunctionException: description: The exception thrown during evaluation (if any). type: string @@ -3445,6 +3483,14 @@ GatewayRegistry: additionalProperties: x-additionalPropertiesName: nodeUID $ref: '#/RegistryNode' + frozen_cluster_compat_version: + allOf: + - $ref: '#/RegistryFrozenClusterCompatVersion' + description: |- + Cluster compatibility version freeze record. When present, the + cluster compatibility version reported by this Sync Gateway is capped at + `frozen_cluster_compat_version.version` until the freeze is cleared via + `POST /_cluster_compat_version/unfreeze`. RegistryNode: type: object @@ -3459,6 +3505,27 @@ RegistryNode: format: date-time description: Time of the node's last heartbeat write to this registry. +RegistryFrozenClusterCompatVersion: + type: object + description: |- + A cluster compatibility version freeze record stored in a bucket registry. Set by + `POST /_cluster_compat_version/freeze` and cleared by + `POST /_cluster_compat_version/unfreeze`. Cluster-wide freeze state is the aggregate of + these records across all bucket registries. + properties: + version: + allOf: + - $ref: '#/ClusterCompatVersion' + description: |- + The cluster compatibility version captured at the time of the freeze. + frozen_at: + type: string + format: date-time + description: Time at which the freeze was set. + required: + - version + - frozen_at + RegistryConfigGroup: type: object properties: diff --git a/docs/api/paths/admin/_cluster_compat_version-freeze.yaml b/docs/api/paths/admin/_cluster_compat_version-freeze.yaml new file mode 100644 index 0000000000..86a96d44e9 --- /dev/null +++ b/docs/api/paths/admin/_cluster_compat_version-freeze.yaml @@ -0,0 +1,53 @@ +# Copyright 2026-Present Couchbase, Inc. +# +# Use of this software is governed by the Business Source License included +# in the file licenses/BSL-Couchbase.txt. As of the Change Date specified +# in that file, in accordance with the Business Source License, use of this +# software will be governed by the Apache License, Version 2.0, included in +# the file licenses/APL2.txt. +post: + summary: Freeze the cluster compatibility version at its current value + description: |- + Captures the current cluster compatibility version and pins the cluster to this version. + + While frozen, subsequent node upgrades will not advance the reported `cluster_compat_version`, preserving the option to roll a node back to the prior major/minor. + + Returns success only if every tracked bucket accepts the freeze. + If one or more buckets fail to accept it, a `503` is returned with the current state — the cluster compatibility version may still be partially pinned, and the request should be retried. + + To clear the pinned version, call `POST /_cluster_compat_version/unfreeze`. + + Required Sync Gateway RBAC roles: + + * Sync Gateway Dev Ops + responses: + '200': + description: Cluster compatibility version has been frozen + content: + application/json: + schema: + $ref: ../../components/schemas.yaml#/ClusterCompatVersionState + '403': + $ref: ../../components/responses.yaml#/Unauthorized + '500': + description: An unexpected error occurred while freezing the cluster compatibility version + content: + application/json: + schema: + $ref: ../../components/schemas.yaml#/HTTP-Error + '503': + description: |- + The freeze could not be fully applied. Possible reasons: + * Cluster compatibility version tracking is not enabled on this node. + * The cluster compatibility version has not yet been computed (e.g. immediately after startup). + * No tracked bucket registries. + * One or more tracked buckets did not accept the freeze (partial failure). In this case the response body is a `ClusterCompatVersionState` describing the version (if any) that is currently pinned, so the admin can see what did take effect. + content: + application/json: + schema: + oneOf: + - $ref: ../../components/schemas.yaml#/HTTP-Error + - $ref: ../../components/schemas.yaml#/ClusterCompatVersionState + tags: + - Server + operationId: post__cluster_compat_version_freeze diff --git a/docs/api/paths/admin/_cluster_compat_version-unfreeze.yaml b/docs/api/paths/admin/_cluster_compat_version-unfreeze.yaml new file mode 100644 index 0000000000..ece8d5c42e --- /dev/null +++ b/docs/api/paths/admin/_cluster_compat_version-unfreeze.yaml @@ -0,0 +1,52 @@ +# Copyright 2026-Present Couchbase, Inc. +# +# Use of this software is governed by the Business Source License included +# in the file licenses/BSL-Couchbase.txt. As of the Change Date specified +# in that file, in accordance with the Business Source License, use of this +# software will be governed by the Apache License, Version 2.0, included in +# the file licenses/APL2.txt. +post: + summary: Clear the cluster compatibility version freeze + description: |- + Clears any pinned cluster compatibility version previously frozen via `POST /_cluster_compat_version/freeze`. + + The reported `cluster_compat_version` will resume tracking the live-node minimum and may immediately advance if all nodes have been upgraded since the freeze was set. + + Returns success only if the cluster is fully unfrozen. + If any part of the cluster remains frozen after the request, a `503` is returned with the current state - + the cluster compatibility version may still be held back, and the request should be retried. + + Required Sync Gateway RBAC roles: + + * Sync Gateway Dev Ops + responses: + '200': + description: Cluster compatibility version freeze has been cleared + content: + application/json: + schema: + $ref: ../../components/schemas.yaml#/ClusterCompatVersionState + '403': + $ref: ../../components/responses.yaml#/Unauthorized + '500': + description: An unexpected error occurred while clearing the cluster compatibility version freeze + content: + application/json: + schema: + $ref: ../../components/schemas.yaml#/HTTP-Error + '503': + description: |- + The unfreeze did not fully apply. The cluster compatibility version may still be held back + and the request should be retried. The response body is either a `ClusterCompatVersionState` + showing the residual freeze, or an `HTTP-Error` when cluster compatibility version tracking + is not enabled on this node, or when the unfreeze partially failed and the residual freeze + state could not be verified. + content: + application/json: + schema: + oneOf: + - $ref: ../../components/schemas.yaml#/ClusterCompatVersionState + - $ref: ../../components/schemas.yaml#/HTTP-Error + tags: + - Server + operationId: post__cluster_compat_version_unfreeze diff --git a/docs/api/paths/admin/_cluster_compat_version.yaml b/docs/api/paths/admin/_cluster_compat_version.yaml new file mode 100644 index 0000000000..26fa773b8f --- /dev/null +++ b/docs/api/paths/admin/_cluster_compat_version.yaml @@ -0,0 +1,38 @@ +# Copyright 2026-Present Couchbase, Inc. +# +# Use of this software is governed by the Business Source License included +# in the file licenses/BSL-Couchbase.txt. As of the Change Date specified +# in that file, in accordance with the Business Source License, use of this +# software will be governed by the Apache License, Version 2.0, included in +# the file licenses/APL2.txt. +get: + summary: Get the cluster compatibility version state + description: |- + Returns the cluster-wide cluster compatibility version, the per-node versions registered + in the cluster, and the frozen value if a freeze is currently in effect. + + Without a freeze, `cluster_compat_version` is the minimum across all live nodes. With a + freeze, `cluster_compat_version` is pinned to `frozen_cluster_compat_version`. The + presence of `frozen_cluster_compat_version` in the response indicates that a freeze is set. + + Required Sync Gateway RBAC roles: + + * Sync Gateway Dev Ops + responses: + '200': + description: Returned the cluster compatibility version state successfully + content: + application/json: + schema: + $ref: ../../components/schemas.yaml#/ClusterCompatVersionState + '403': + $ref: ../../components/responses.yaml#/Unauthorized + '503': + description: Cluster compatibility version tracking is not enabled on this node + content: + application/json: + schema: + $ref: ../../components/schemas.yaml#/HTTP-Error + tags: + - Server + operationId: get__cluster_compat_version diff --git a/rest/cluster_compat.go b/rest/cluster_compat.go index c211952220..f6d973d570 100644 --- a/rest/cluster_compat.go +++ b/rest/cluster_compat.go @@ -10,13 +10,36 @@ package rest import ( "context" + "errors" "fmt" "sync" + "sync/atomic" "time" "github.com/couchbase/sync_gateway/base" + "github.com/couchbase/sync_gateway/db" ) +// ErrFreezeNoVersion is returned by clusterCompatManager.Freeze when no cluster compatibility +// version has been observed yet (e.g. immediately after startup, before any node has registered). +var ErrFreezeNoVersion = errors.New("cluster compatibility version not yet computed") + +// ErrFreezeNoBucketsWritten is returned by clusterCompatManager.Freeze when there are no +// tracked buckets at all — there is nowhere to persist the freeze. Distinct from +// ErrFreezePartial which signals that some buckets accepted the freeze and others didn't. +var ErrFreezeNoBucketsWritten = errors.New("freeze could not be applied: no tracked buckets") + +// ErrFreezePartial is returned by clusterCompatManager.Freeze when one or more tracked +// buckets did not accept the freeze. The cluster compatibility version may only be partially +// pinned in this case — the caller should surface the returned aggregate freeze record so +// the admin can see which version (if any) is still in effect. +var ErrFreezePartial = errors.New("freeze did not fully apply across all tracked buckets") + +// ErrUnfreezePartial is returned by clusterCompatManager.Unfreeze when one or more tracked +// buckets still hold a freeze record after the operation. The cluster compatibility version +// may still be held back. +var ErrUnfreezePartial = errors.New("unfreeze did not fully apply across all tracked buckets") + // defaultNodeHeartbeatExpiry is the fallback expiry used when node_heartbeat_expiry is unset. // Sized to tolerate several missed refreshes (at the default config_update_frequency of 10s) before pruning a peer. const defaultNodeHeartbeatExpiry = 60 * time.Second @@ -40,9 +63,18 @@ type clusterCompatManager struct { // the manager — a transient RegisterNodeVersion failure must not cause us to drop // ownership or skip the bucket on next refresh / shutdown deregister. trackedBuckets map[string]struct{} - // cachedVersion/cachedNodes are observed state from the most recent successful refresh. + // cachedNodes is the union of node→version across all tracked bucket registries from + // the most recent observation. Input to computeCCV. + cachedNodes map[string]base.ClusterCompatVersion + // cachedFreeze is the aggregate freeze record across tracked bucket registries: any + // bucket having a freeze record means the cluster is frozen; the aggregate Version is + // the minimum across those records and FrozenAt is the earliest. Nil when no bucket + // reports a freeze. Stored separately for surfacing via the API and audit; its Version + // participates in computeCCV alongside the node versions. + cachedFreeze *base.RegistryFreeze + // cachedVersion is the reported cluster compat version: computeCCV(cachedNodes, cachedFreeze). + // Nil when nothing has been observed yet. cachedVersion *base.ClusterCompatVersion - cachedNodes map[string]base.ClusterCompatVersion // appliedDBVersions tracks the database config version this node has successfully // applied, keyed by bucket name then database name. Populated by _applyConfig on // successful load, consumed by RegisterNodeVersion to stamp the registry. @@ -54,7 +86,8 @@ type clusterCompatManager struct { mu sync.RWMutex } -// getCachedVersion returns a copy of the cached cluster compat version, or nil if not computed. +// getCachedVersion returns a copy of the currently-reported cluster compat version, or nil +// if not computed. func (m *clusterCompatManager) getCachedVersion() *base.ClusterCompatVersion { m.mu.RLock() defer m.mu.RUnlock() @@ -65,6 +98,17 @@ func (m *clusterCompatManager) getCachedVersion() *base.ClusterCompatVersion { return &cp } +// getCachedFreeze returns a copy of the aggregate cached freeze, or nil if no freeze is set. +func (m *clusterCompatManager) getCachedFreeze() *base.RegistryFreeze { + m.mu.RLock() + defer m.mu.RUnlock() + if m.cachedFreeze == nil { + return nil + } + cp := *m.cachedFreeze + return &cp +} + // refreshInterval is the rate-limit window for periodic Refresh calls. The heartbeat write // to each bucket's registry doc only needs to happen well within the (eventual) heartbeat // expiry — rewriting on every poll tick just churns CAS for no benefit and worsens contention @@ -173,23 +217,51 @@ func (m *clusterCompatManager) RegisterBucket(ctx context.Context, bucket string if !m.claimBucket(bucket) { return nil } - registry, err := m.sc.BootstrapContext.RegisterNodeVersion(ctx, bucket, m.sc.NodeUID, m.sc.Config.Bootstrap.ConfigGroupID, m.sc.BootstrapContext.clusterCompatVersion, m.getAppliedDBVersionsForBucket(bucket), m.heartbeatExpiry()) + // ratchetHWM=false here: HWM is monotonic and cannot be rolled back, so an advance + // committed off transient startup state would lock the cluster at a too-high value + // forever. The HWM ratchet happens later via the periodic Refresh, gated per-bucket on + // at least one database having reached DBOnline (see ratchetEligibleBuckets). + registry, err := m.sc.BootstrapContext.RegisterNodeVersion(ctx, bucket, m.sc.NodeUID, m.sc.Config.Bootstrap.ConfigGroupID, m.sc.BootstrapContext.clusterCompatVersion, m.getAppliedDBVersionsForBucket(bucket), m.heartbeatExpiry(), false) if err != nil { m.releaseBucket(bucket) return err } - // Merge this bucket's registry into the cached view and recompute the min. A node - // removed from this bucket but still present in another tracked bucket would not - // be evicted here — that's reconciled by the periodic Refresh, which rebuilds the - // cache from scratch. - oldVersion, newVersion := m.mergeRegistryNodesIntoCache(registry.Nodes) - base.InfofCtx(ctx, base.KeyConfig, "Registered node %s in bucket %s; cluster compatibility version is %v", m.sc.NodeUID, base.MD(bucket), &newVersion) - if oldVersion != nil && *oldVersion != newVersion { - base.InfofCtx(ctx, base.KeyConfig, "Cluster compatibility version changed from %v to %v", oldVersion, &newVersion) + // Merge this bucket's registry into the cached view and recompute. A node removed from + // this bucket but still present in another tracked bucket would not be evicted here — + // that's reconciled by the periodic Refresh, which rebuilds the cache from scratch. Same + // for the freeze record: a freeze cleared elsewhere stays reflected here until the next + // Refresh recomputes the aggregate. + oldVersion, newVersion := m.mergeRegistryIntoCache(registry.Nodes, registry.Frozen) + base.InfofCtx(ctx, base.KeyConfig, "Registered node %s in bucket %s; cluster compatibility version is %v", m.sc.NodeUID, base.MD(bucket), newVersion) + if !clusterCompatVersionEqual(oldVersion, newVersion) { + base.InfofCtx(ctx, base.KeyConfig, "Cluster compatibility version changed from %v to %v", oldVersion, newVersion) } return nil } +// ratchetEligibleBuckets returns the set of buckets with at least one DBOnline database. +// Used by Refresh to gate the HWM ratchet on per-bucket online state: ratcheting requires +// inputs (e.g. legacy-node detection via ISGR/cbgt) that are only available once the +// database is fully online. Buckets absent from the returned set stay heartbeat-only until +// the next refresh tick. +// +// Reads the database state directly from ServerContext rather than maintaining a shadow set +// — the DB state is authoritative and follows offline transitions automatically. Computed +// once per Refresh under a single _databasesLock acquisition so the per-bucket loop in +// refreshNodeRegistrations doesn't re-take the lock and rescan _databases for every bucket. +func (m *clusterCompatManager) ratchetEligibleBuckets() map[string]struct{} { + m.sc._databasesLock.RLock() + defer m.sc._databasesLock.RUnlock() + eligible := make(map[string]struct{}) + for _, dbContext := range m.sc._databases { + if atomic.LoadUint32(&dbContext.State) != db.DBOnline { + continue + } + eligible[dbContext.Bucket.GetName()] = struct{}{} + } + return eligible +} + // claimBucket atomically marks the bucket as tracked and reports whether this call was the // one that acquired tracking. Returns false if the bucket was already tracked, in which case // the caller should skip the per-first-time registration work. @@ -211,10 +283,10 @@ func (m *clusterCompatManager) releaseBucket(bucket string) { delete(m.trackedBuckets, bucket) } -// mergeRegistryNodesIntoCache folds the given bucket's node entries into the cached cluster -// view, recomputes the cluster compat version, and returns the previous and new cached -// versions for caller-side change logging. -func (m *clusterCompatManager) mergeRegistryNodesIntoCache(nodes map[string]*base.RegistryNode) (oldVersion *base.ClusterCompatVersion, newVersion base.ClusterCompatVersion) { +// mergeRegistryIntoCache folds the given bucket's node entries and freeze record into the +// cached cluster view, recomputes the cluster compat version, and returns the previous and +// new cached versions for caller-side change logging. +func (m *clusterCompatManager) mergeRegistryIntoCache(nodes map[string]*base.RegistryNode, freeze *base.RegistryFreeze) (oldVersion, newVersion *base.ClusterCompatVersion) { m.mu.Lock() defer m.mu.Unlock() if m.cachedNodes == nil { @@ -223,9 +295,12 @@ func (m *clusterCompatManager) mergeRegistryNodesIntoCache(nodes map[string]*bas for nodeUID, node := range nodes { m.cachedNodes[nodeUID] = node.Version } - newVersion = minClusterCompatVersion(m.cachedNodes) + if freeze != nil { + m.cachedFreeze = mergeFreeze(m.cachedFreeze, freeze) + } oldVersion = m.cachedVersion - m.cachedVersion = &newVersion + newVersion = computeCCV(m.cachedNodes, m.cachedFreeze) + m.cachedVersion = newVersion return oldVersion, newVersion } @@ -279,28 +354,38 @@ func (m *clusterCompatManager) Refresh(ctx context.Context) { return } - version, nodes, err := m.refreshNodeRegistrations(ctx) + nodes, freeze, err := m.refreshNodeRegistrations(ctx) if err != nil { base.WarnfCtx(ctx, "Failed to refresh cluster compat version: %v", err) return } - oldVersion := m.getCachedVersion() - if !clusterCompatVersionEqual(oldVersion, version) { - base.InfofCtx(ctx, base.KeyConfig, "Cluster compatibility version changed from %v to %v", oldVersion, version) - } + newVersion := computeCCV(nodes, freeze) + + // Race window: if a Freeze or Unfreeze completed between refreshNodeRegistrations + // returning and the lock acquisition below, this write overwrites their mutation with + // data that was observed *before* the admin op ran. The cache will be stale (a phantom + // freeze or a missing freeze) until the next periodic Refresh rebuilds from authoritative + // bucket state. Admin endpoints are rare, the window is bounded by the bucket I/O above, + // and the self-heal is automatic — accepted as a transient over more locking machinery. m.mu.Lock() - m.cachedVersion = version + oldVersion := m.cachedVersion m.cachedNodes = nodes + m.cachedFreeze = freeze + m.cachedVersion = newVersion m.lastRefreshAt = time.Now().UTC() m.mu.Unlock() + + if !clusterCompatVersionEqual(oldVersion, newVersion) { + base.InfofCtx(ctx, base.KeyConfig, "Cluster compatibility version changed from %v to %v", oldVersion, newVersion) + } } // refreshNodeRegistrations iterates the tracked buckets, registers this node, and returns -// the minimum version across all nodes in those registries and a flat map of all node -// versions. Returns an error if every tracked bucket failed so callers can leave the -// previously-cached state in place — stale is preferable to flipping the cluster compat -// version to nil on a transient bucket outage. -func (m *clusterCompatManager) refreshNodeRegistrations(ctx context.Context) (*base.ClusterCompatVersion, map[string]base.ClusterCompatVersion, error) { +// the per-node version map and the aggregate freeze record across those registries. Returns +// an error if every tracked bucket failed so callers can leave the previously-cached state +// in place — stale is preferable to flipping the cluster compat version to nil on a transient +// bucket outage. +func (m *clusterCompatManager) refreshNodeRegistrations(ctx context.Context) (map[string]base.ClusterCompatVersion, *base.RegistryFreeze, error) { buckets := m.trackedBucketList() if len(buckets) == 0 { return nil, nil, nil @@ -308,12 +393,19 @@ func (m *clusterCompatManager) refreshNodeRegistrations(ctx context.Context) (*b nodeVersion := m.sc.BootstrapContext.clusterCompatVersion expiry := m.heartbeatExpiry() + // Gate HWM ratchet on per-bucket online state — see ratchetEligibleBuckets. Heartbeat + // refresh happens unconditionally so the node entry stays fresh even for buckets whose + // databases haven't come online yet. Compute eligibility once per Refresh so the loop + // body doesn't re-acquire _databasesLock for every tracked bucket. + eligibleBuckets := m.ratchetEligibleBuckets() // Collect unique node versions across all bucket registries. A node appearing in multiple // bucket registries will have the same version — last-write-wins is fine. nodeMap := make(map[string]base.ClusterCompatVersion) + var aggregateFreeze *base.RegistryFreeze succeeded := 0 for _, bucket := range buckets { - registry, err := m.sc.BootstrapContext.RegisterNodeVersion(ctx, bucket, m.sc.NodeUID, m.sc.Config.Bootstrap.ConfigGroupID, nodeVersion, m.getAppliedDBVersionsForBucket(bucket), expiry) + _, ratchet := eligibleBuckets[bucket] + registry, err := m.sc.BootstrapContext.RegisterNodeVersion(ctx, bucket, m.sc.NodeUID, m.sc.Config.Bootstrap.ConfigGroupID, nodeVersion, m.getAppliedDBVersionsForBucket(bucket), expiry, ratchet) if err != nil { base.WarnfCtx(ctx, "Failed to register node version in bucket %s: %v", base.MD(bucket), err) continue @@ -322,22 +414,216 @@ func (m *clusterCompatManager) refreshNodeRegistrations(ctx context.Context) (*b for nodeUID, node := range registry.Nodes { nodeMap[nodeUID] = node.Version } + if registry.Frozen != nil { + aggregateFreeze = mergeFreeze(aggregateFreeze, registry.Frozen) + } } if succeeded == 0 { return nil, nil, fmt.Errorf("no tracked bucket registries could be updated (%d tracked)", len(buckets)) } - minCompatVersion := minClusterCompatVersion(nodeMap) - return &minCompatVersion, nodeMap, nil + return nodeMap, aggregateFreeze, nil } -// minClusterCompatVersion returns the minimum version across the given node map. Returns -// the zero value when the map is empty. -func minClusterCompatVersion(nodes map[string]base.ClusterCompatVersion) base.ClusterCompatVersion { - versions := make([]base.ClusterCompatVersion, 0, len(nodes)) +// computeCCV returns the reported cluster compat version: the minimum across the live-node +// versions and the freeze ceiling (when set). Returns nil when no versions are available. +// +// The single combining point for all CCV inputs. Future inputs (e.g. an HWM floor) should +// participate here rather than as a separate override path. +func computeCCV(nodes map[string]base.ClusterCompatVersion, freeze *base.RegistryFreeze) *base.ClusterCompatVersion { + versions := make([]base.ClusterCompatVersion, 0, len(nodes)+1) for _, v := range nodes { versions = append(versions, v) } - return base.MinClusterCompatVersion(versions...) + if freeze != nil { + versions = append(versions, freeze.Version) + } + if len(versions) == 0 { + return nil + } + v := base.MinClusterCompatVersion(versions...) + return &v +} + +// mergeFreeze combines two freeze records into one. The result has the minimum Version (so a +// lower-versioned freeze wins) and the earliest FrozenAt (the original freeze time). Either +// argument may be nil. Returns a fresh RegistryFreeze pointer. +func mergeFreeze(a, b *base.RegistryFreeze) *base.RegistryFreeze { + if a == nil && b == nil { + return nil + } + if a == nil { + cp := *b + return &cp + } + if b == nil { + cp := *a + return &cp + } + out := &base.RegistryFreeze{ + Version: base.MinClusterCompatVersion(a.Version, b.Version), + FrozenAt: a.FrozenAt, + } + if b.FrozenAt.Before(out.FrozenAt) { + out.FrozenAt = b.FrozenAt + } + return out +} + +// Freeze captures the currently-reported cluster compat version into every tracked bucket +// registry, pinning the cluster from advancing past it. Success requires that every tracked +// bucket accepts the freeze: a partial freeze would leave bucket registries (and any +// downgrade-gate decisions keyed off them) in an inconsistent state. +// +// Returns ErrFreezeNoVersion if no version has been observed yet (e.g. immediately after +// startup). Returns ErrFreezeNoBucketsWritten if there are no tracked buckets at all. +// Returns ErrFreezePartial alongside whatever aggregate freeze did take effect when one or +// more tracked buckets failed to accept the freeze — callers should surface the aggregate to +// the admin so they can see which version is currently pinned and on which buckets. +// +// On full success the manager's cached freeze and reported version are updated and the +// returned record is the aggregate freeze now in effect across all tracked buckets. On +// partial failure the cache is updated to reflect what did take effect, and the returned +// aggregate is the partial freeze. If *no* bucket accepted the freeze (succeeded==0) the +// cache is deliberately left untouched: a transient outage that fails every bucket must +// not erase a real, persistent freeze from the reporting endpoint — Refresh will +// self-heal once the buckets come back. +// +// Locking: the cached cluster compat version is snapshotted under m.mu.RLock at the start +// and the RLock is held across all bucket writes. This blocks concurrent Refresh from +// completing its post-I/O write phase while a freeze is in progress, so the version we +// write is guaranteed not to shift relative to what was reported by GET. Bucket I/O is +// otherwise rare admin work, so the contention cost is negligible. +// +// Cross-bucket drift on retry: SetRegistryFreeze is idempotent — once a bucket has a freeze +// record, subsequent calls return the existing record unchanged. If a Freeze partially +// fails and the cluster's live-node minimum then advances before the admin retries, the +// retry will leave already-frozen buckets at the original version while writing the newer +// version to the previously-failed buckets. The mergeFreeze aggregate keeps the reported +// version at the minimum so clients see a consistent CCV, but per-bucket HWM caps (see +// RegisterNodeVersion in config_manager.go) use each bucket's local freeze. The practical +// effect is that HWM may advance further on the higher-version bucket — which errs toward +// refusing downgrades, the safe direction. +func (m *clusterCompatManager) Freeze(ctx context.Context) (*base.RegistryFreeze, error) { + m.mu.RLock() + if m.cachedVersion == nil { + m.mu.RUnlock() + return nil, ErrFreezeNoVersion + } + current := *m.cachedVersion + buckets := make([]string, 0, len(m.trackedBuckets)) + for b := range m.trackedBuckets { + buckets = append(buckets, b) + } + if len(buckets) == 0 { + m.mu.RUnlock() + return nil, ErrFreezeNoBucketsWritten + } + // Hold RLock across the bucket writes so cachedVersion cannot shift under us. + var aggregate *base.RegistryFreeze + succeeded := 0 + for _, bucket := range buckets { + freeze, err := m.sc.BootstrapContext.SetRegistryFreeze(ctx, bucket, current) + if err != nil { + base.WarnfCtx(ctx, "Failed to set cluster compat version freeze in bucket %s: %v", base.MD(bucket), err) + continue + } + succeeded++ + aggregate = mergeFreeze(aggregate, freeze) + } + m.mu.RUnlock() + + // Only mutate the cache when at least one bucket accepted the freeze. If every bucket + // failed, aggregate is nil and writing it would wipe any previously-cached freeze — a + // transient outage should not erase persistent state. Refresh self-heals from the + // authoritative bucket registries on its next tick. + if succeeded > 0 { + m.mu.Lock() + m.cachedFreeze = aggregate + m.cachedVersion = computeCCV(m.cachedNodes, m.cachedFreeze) + m.mu.Unlock() + } + + if succeeded < len(buckets) { + base.WarnfCtx(ctx, "Cluster compatibility version freeze did not fully apply: %d/%d tracked buckets accepted the freeze", succeeded, len(buckets)) + return aggregate, ErrFreezePartial + } + base.InfofCtx(ctx, base.KeyConfig, "Cluster compatibility version frozen at %v (applied to %d/%d tracked buckets)", &aggregate.Version, succeeded, len(buckets)) + return aggregate, nil +} + +// Unfreeze clears the freeze from every tracked bucket registry. Unlike Freeze, this is +// success-on-all: if any bucket still has a freeze record after the operation, the cluster +// remains held back, so an error is returned along with the residual freeze record so the +// caller can surface the remaining state to the admin. +// +// Returns ErrUnfreezePartial in two situations the caller should disambiguate using the +// returned residual: +// - residual != nil: at least one bucket still has a freeze record on re-read. The cache +// is updated to that aggregate so the reporting endpoint reflects on-disk truth. +// - residual == nil: one or more bucket clears failed AND the post-failure re-reads also +// failed, so the actual on-disk state is unknown. The cache is deliberately left +// untouched in this case — the pre-op snapshot is the most honest representation until +// the next periodic Refresh self-heals from authoritative bucket state. +// +// previousFreeze is the aggregate freeze that was in effect at the start of the call +// (captured under lock before any mutation) — i.e. the freeze record that the unfreeze is +// attempting to lift, not what got cleared. It is returned even on partial failure so +// handlers can populate audit fields or error messages without a separate peek-then-clear +// that could race with Refresh. +func (m *clusterCompatManager) Unfreeze(ctx context.Context) (previousFreeze, residual *base.RegistryFreeze, err error) { + m.mu.RLock() + if m.cachedFreeze != nil { + cp := *m.cachedFreeze + previousFreeze = &cp + } + m.mu.RUnlock() + + buckets := m.trackedBucketList() + if len(buckets) == 0 { + m.mu.Lock() + m.cachedFreeze = nil + m.cachedVersion = computeCCV(m.cachedNodes, nil) + m.mu.Unlock() + return previousFreeze, nil, nil + } + clearFailed := 0 + for _, bucket := range buckets { + err := m.sc.BootstrapContext.ClearRegistryFreeze(ctx, bucket) + if err != nil { + clearFailed++ + base.WarnfCtx(ctx, "Failed to clear cluster compat version freeze in bucket %s: %v", base.MD(bucket), err) + // Re-read to discover the freeze still in effect on this bucket. + registry, getErr := m.sc.BootstrapContext.getGatewayRegistry(ctx, bucket) + if getErr != nil { + base.WarnfCtx(ctx, "Failed to re-read registry for bucket %s after clear failure: %v", base.MD(bucket), getErr) + continue + } + if registry.Frozen != nil { + residual = mergeFreeze(residual, registry.Frozen) + } + } + } + // Only mutate the cache when we have certainty about on-disk state: a full success + // (clearFailed == 0) clears it; a verified residual replaces it. If buckets failed and + // re-reads also failed, leave the cache as-is so the reporting endpoint keeps showing + // the pre-op snapshot until Refresh self-heals. + if clearFailed == 0 || residual != nil { + m.mu.Lock() + m.cachedFreeze = residual + m.cachedVersion = computeCCV(m.cachedNodes, m.cachedFreeze) + m.mu.Unlock() + } + if residual != nil { + base.WarnfCtx(ctx, "Cluster compatibility version unfreeze did not fully apply: %d/%d buckets failed; cluster still frozen at %v", clearFailed, len(buckets), &residual.Version) + return previousFreeze, residual, ErrUnfreezePartial + } + if clearFailed > 0 { + // Buckets failed and re-read couldn't verify residual state — return error anyway, + // since the admin asked for a guarantee and we can't make one. Cache is preserved. + return previousFreeze, nil, ErrUnfreezePartial + } + base.InfofCtx(ctx, base.KeyConfig, "Cluster compatibility version freeze cleared on %d buckets", len(buckets)) + return previousFreeze, nil, nil } // clusterCompatVersionEqual compares two possibly-nil ClusterCompatVersion pointers. diff --git a/rest/cluster_compat_audit_test.go b/rest/cluster_compat_audit_test.go new file mode 100644 index 0000000000..da5aa44dbc --- /dev/null +++ b/rest/cluster_compat_audit_test.go @@ -0,0 +1,91 @@ +// Copyright 2026-Present Couchbase, Inc. +// +// Use of this software is governed by the Business Source License included +// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified +// in that file, in accordance with the Business Source License, use of this +// software will be governed by the Apache License, Version 2.0, included in +// the file licenses/APL2.txt. + +//go:build !race + +package rest + +import ( + "net/http" + "testing" + + "github.com/couchbase/sync_gateway/base" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestClusterCompatFreezeRESTRoundTrip exercises the admin REST endpoints end-to-end and +// verifies each one emits its expected audit event. Audit logging is EE-only, so this test +// is skipped on CE. +func TestClusterCompatFreezeRESTRoundTrip(t *testing.T) { + rt := createAuditLoggingRestTester(t) + defer rt.Close() + RequireStatus(t, rt.CreateDatabase("db", rt.NewDbConfig()), http.StatusCreated) + + ctx := base.TestCtx(t) + ccm := rt.ServerContext().ClusterCompat + require.NotNil(t, ccm) + ccm.Refresh(ctx) + + // GET before freeze: cluster_compat_version + nodes, no frozen_cluster_compat_version. + var initial ClusterCompatVersionState + output := base.AuditLogContents(t, func(t testing.TB) { + resp := rt.SendAdminRequest(http.MethodGet, "/_cluster_compat_version", "") + RequireStatus(t, resp, http.StatusOK) + require.NoError(t, base.JSONUnmarshal(resp.BodyBytes(), &initial)) + }) + require.NotNil(t, initial.ClusterCompatVersion) + assert.Equal(t, base.NodeClusterCompatVersion, *initial.ClusterCompatVersion) + assert.NotEmpty(t, initial.Nodes) + assert.Nil(t, initial.FrozenClusterCompatVersion) + requireAuditEventPresent(t, output, base.AuditIDClusterCompatVersionRead) + + // POST /freeze: returns 200 with frozen_cluster_compat_version populated. + var frozen ClusterCompatVersionState + output = base.AuditLogContents(t, func(t testing.TB) { + resp := rt.SendAdminRequest(http.MethodPost, "/_cluster_compat_version/freeze", "") + RequireStatus(t, resp, http.StatusOK) + require.NoError(t, base.JSONUnmarshal(resp.BodyBytes(), &frozen)) + }) + require.NotNil(t, frozen.FrozenClusterCompatVersion) + assert.Equal(t, base.NodeClusterCompatVersion, *frozen.FrozenClusterCompatVersion) + requireAuditEventPresent(t, output, base.AuditIDClusterCompatVersionFreeze) + + // GET after freeze: same shape. + resp := rt.SendAdminRequest(http.MethodGet, "/_cluster_compat_version", "") + RequireStatus(t, resp, http.StatusOK) + var afterFreeze ClusterCompatVersionState + require.NoError(t, base.JSONUnmarshal(resp.BodyBytes(), &afterFreeze)) + require.NotNil(t, afterFreeze.FrozenClusterCompatVersion) + + // POST /unfreeze: 200 with frozen_cluster_compat_version omitted. + var unfrozen ClusterCompatVersionState + output = base.AuditLogContents(t, func(t testing.TB) { + resp := rt.SendAdminRequest(http.MethodPost, "/_cluster_compat_version/unfreeze", "") + RequireStatus(t, resp, http.StatusOK) + require.NoError(t, base.JSONUnmarshal(resp.BodyBytes(), &unfrozen)) + }) + assert.Nil(t, unfrozen.FrozenClusterCompatVersion) + requireAuditEventPresent(t, output, base.AuditIDClusterCompatVersionUnfreeze) +} + +// requireAuditEventPresent asserts that the given audit log buffer contains at least one +// event with the expected audit ID. +func requireAuditEventPresent(t testing.TB, output []byte, expected base.AuditID) { + t.Helper() + for _, event := range jsonLines(t, output) { + id, ok := event["id"] + if !ok { + continue + } + if base.AuditID(id.(float64)) == expected { + return + } + } + t.Fatalf("expected audit event %d (%s) not found in audit log", expected, base.AuditEvents[expected].Name) +} diff --git a/rest/cluster_compat_test.go b/rest/cluster_compat_test.go index ba74601889..74ea606491 100644 --- a/rest/cluster_compat_test.go +++ b/rest/cluster_compat_test.go @@ -115,7 +115,7 @@ func TestRegisterNodeVersionCASRetry(t *testing.T) { var eg errgroup.Group for i := 0; i < n; i++ { eg.Go(func() error { - _, err := bc.RegisterNodeVersion(ctx, bucketName, fmt.Sprintf("node-%d", i), "", version, nil, time.Hour) + _, err := bc.RegisterNodeVersion(ctx, bucketName, fmt.Sprintf("node-%d", i), "", version, nil, time.Hour, true) return err }) } @@ -142,7 +142,7 @@ func TestDeregisterNodeVersionCASRetry(t *testing.T) { const n = 10 version := base.NodeClusterCompatVersion for i := 0; i < n; i++ { - _, err := bc.RegisterNodeVersion(ctx, bucketName, fmt.Sprintf("node-%d", i), "", version, nil, time.Hour) + _, err := bc.RegisterNodeVersion(ctx, bucketName, fmt.Sprintf("node-%d", i), "", version, nil, time.Hour, true) require.NoError(t, err) } @@ -280,13 +280,13 @@ func TestClusterCompatPruneSelfNotPruned(t *testing.T) { require.NotNil(t, ccm) // Make sure self is registered, then make its heartbeat ancient. - _, err := bc.RegisterNodeVersion(ctx, bucketName, selfUID, "", base.NodeClusterCompatVersion, nil, time.Hour) + _, err := bc.RegisterNodeVersion(ctx, bucketName, selfUID, "", base.NodeClusterCompatVersion, nil, time.Hour, true) require.NoError(t, err) staleTime := time.Now().Add(-100 * ccm.heartbeatExpiry()) setNodeHeartbeatAt(t, rt, bucketName, selfUID, staleTime) // Re-register with a non-zero expiry. Self must survive and have a fresh heartbeat. - registry, err := bc.RegisterNodeVersion(ctx, bucketName, selfUID, "", base.NodeClusterCompatVersion, nil, ccm.heartbeatExpiry()) + registry, err := bc.RegisterNodeVersion(ctx, bucketName, selfUID, "", base.NodeClusterCompatVersion, nil, ccm.heartbeatExpiry(), true) require.NoError(t, err) require.Contains(t, registry.Nodes, selfUID) assert.True(t, registry.Nodes[selfUID].HeartbeatAt.After(staleTime), "self's heartbeat must have been refreshed") @@ -403,17 +403,24 @@ func TestClusterCompatDowngradeAllowedSameOrOlderPeers(t *testing.T) { } // TestClusterCompatDowngradeEmptyRegistry verifies that creating a database against a fresh -// (empty) bucket succeeds and ratchets ClusterCompatVersionHWM up to the node's compat version. +// (empty) bucket succeeds and ratchets ClusterCompatVersionHWM up to the node's compat +// version. The ratchet is performed by Refresh once the database is online — drive it +// explicitly here rather than waiting for the periodic ticker. func TestClusterCompatDowngradeEmptyRegistry(t *testing.T) { rt := NewRestTesterPersistentConfig(t) defer rt.Close() + ctx := base.TestCtx(t) bucketName := rt.Bucket().GetName() bc := rt.ServerContext().BootstrapContext - registry, err := bc.getGatewayRegistry(base.TestCtx(t), bucketName) + ccm := rt.ServerContext().ClusterCompat + require.NotNil(t, ccm) + ccm.Refresh(ctx) + + registry, err := bc.getGatewayRegistry(ctx, bucketName) require.NoError(t, err) - assert.Equal(t, base.NodeClusterCompatVersion, registry.ClusterCompatVersionHWM, "HWM should be ratcheted to node version after first apply") + assert.Equal(t, base.NodeClusterCompatVersion, registry.ClusterCompatVersionHWM, "HWM should be ratcheted to node version after first refresh") } // TestClusterCompatDowngradeBlockedByPersistentHWM verifies the persistent floor: a bucket @@ -462,6 +469,10 @@ func TestClusterCompatDowngradeHWMRatchets(t *testing.T) { bc := rt.ServerContext().BootstrapContext bucketName := rt.Bucket().GetName() + ccm := rt.ServerContext().ClusterCompat + require.NotNil(t, ccm) + ccm.Refresh(ctx) + registry, err := bc.getGatewayRegistry(ctx, bucketName) require.NoError(t, err) require.Equal(t, base.NodeClusterCompatVersion, registry.ClusterCompatVersionHWM) @@ -470,7 +481,7 @@ func TestClusterCompatDowngradeHWMRatchets(t *testing.T) { registry.ClusterCompatVersionHWM = preserved require.NoError(t, bc.setGatewayRegistry(ctx, bucketName, registry)) - _, err = bc.RegisterNodeVersion(ctx, bucketName, "lower-peer", "", base.NewClusterCompatVersion(0, 1), nil, time.Hour) + _, err = bc.RegisterNodeVersion(ctx, bucketName, "lower-peer", "", base.NewClusterCompatVersion(0, 1), nil, time.Hour, true) require.Error(t, err, "lower-version registration must be rejected when HWM is higher") require.Contains(t, err.Error(), "newer Sync Gateway cluster compat version") @@ -491,7 +502,11 @@ func TestClusterCompatDowngradeHWMTracksMinAcrossNodes(t *testing.T) { bc := rt.ServerContext().BootstrapContext bucketName := rt.Bucket().GetName() - // After auto-create db, HWM == self version (only node). + ccm := rt.ServerContext().ClusterCompat + require.NotNil(t, ccm) + ccm.Refresh(ctx) + + // After auto-create db + first refresh, HWM == self version (only node). registry, err := bc.getGatewayRegistry(ctx, bucketName) require.NoError(t, err) require.Equal(t, base.NodeClusterCompatVersion, registry.ClusterCompatVersionHWM) @@ -499,7 +514,7 @@ func TestClusterCompatDowngradeHWMTracksMinAcrossNodes(t *testing.T) { // Add a higher-version peer. Cluster compat is still min(self, higher) == self, so HWM // must not budge. higher := base.NewClusterCompatVersion(base.NodeClusterCompatVersion.Major+1, 0) - _, err = bc.RegisterNodeVersion(ctx, bucketName, "higher-peer", "", higher, nil, time.Hour) + _, err = bc.RegisterNodeVersion(ctx, bucketName, "higher-peer", "", higher, nil, time.Hour, true) require.NoError(t, err) registry, err = bc.getGatewayRegistry(ctx, bucketName) require.NoError(t, err) @@ -1206,7 +1221,312 @@ func TestIsConfigFullyAppliedNoEligibleAckersIntegration(t *testing.T) { assert.Empty(t, missing) } +// TestClusterCompatFreezeAndUnfreeze exercises the manager's Freeze/Unfreeze methods: +// Freeze captures the current cluster compat version into the registry; Unfreeze clears it. +func TestClusterCompatFreezeAndUnfreeze(t *testing.T) { + rt := NewRestTesterPersistentConfig(t) + defer rt.Close() + + ctx := base.TestCtx(t) + bc := rt.ServerContext().BootstrapContext + bucketName := rt.Bucket().GetName() + + ccm := rt.ServerContext().ClusterCompat + require.NotNil(t, ccm) + ccm.Refresh(ctx) + require.NotNil(t, ccm.ClusterCompatVersion()) + + freeze, err := ccm.Freeze(ctx) + require.NoError(t, err) + require.NotNil(t, freeze) + assert.Equal(t, base.NodeClusterCompatVersion, freeze.Version) + assert.False(t, freeze.FrozenAt.IsZero()) + + registry, err := bc.getGatewayRegistry(ctx, bucketName) + require.NoError(t, err) + require.NotNil(t, registry.Frozen, "freeze should be persisted to bucket registry") + assert.Equal(t, freeze.Version, registry.Frozen.Version) + + cleared, residual, err := ccm.Unfreeze(ctx) + require.NoError(t, err) + assert.Nil(t, residual) + require.NotNil(t, cleared, "Unfreeze should return the freeze record that was cleared") + assert.Equal(t, base.NodeClusterCompatVersion, cleared.Version) + + registry, err = bc.getGatewayRegistry(ctx, bucketName) + require.NoError(t, err) + assert.Nil(t, registry.Frozen, "freeze should be cleared from bucket registry") + assert.Nil(t, ccm.getCachedFreeze()) +} + +// TestClusterCompatFreezeIdempotent verifies that calling Freeze a second time returns the +// existing freeze record rather than refreshing FrozenAt. +func TestClusterCompatFreezeIdempotent(t *testing.T) { + rt := NewRestTesterPersistentConfig(t) + defer rt.Close() + + ctx := base.TestCtx(t) + ccm := rt.ServerContext().ClusterCompat + require.NotNil(t, ccm) + ccm.Refresh(ctx) + + first, err := ccm.Freeze(ctx) + require.NoError(t, err) + require.NotNil(t, first) + + second, err := ccm.Freeze(ctx) + require.NoError(t, err) + require.NotNil(t, second) + assert.Equal(t, first.Version, second.Version) + assert.True(t, second.FrozenAt.Equal(first.FrozenAt), "FrozenAt should not change on a no-op re-freeze") +} + +// TestClusterCompatFreezePinsAcrossNodeAdvances seeds peer nodes with a higher version into the +// registry and verifies that, while frozen, the reported cluster compat version stays at the +// captured value rather than advancing to the live-node minimum. +func TestClusterCompatFreezePinsAcrossNodeAdvances(t *testing.T) { + rt := NewRestTesterPersistentConfig(t) + defer rt.Close() + + ctx := base.TestCtx(t) + bucketName := rt.Bucket().GetName() + + ccm := rt.ServerContext().ClusterCompat + require.NotNil(t, ccm) + ccm.Refresh(ctx) + + freeze, err := ccm.Freeze(ctx) + require.NoError(t, err) + require.NotNil(t, freeze) + frozenVersion := freeze.Version + + // Seed a peer at a higher version directly into the registry. The freeze pins reporting + // at frozenVersion regardless of whether peer registrations would have advanced the + // live-node minimum. + higher := base.NewClusterCompatVersion(frozenVersion.Major+1, 0) + seedRegistryNode(t, rt, bucketName, "synthetic-peer", higher) + + ccm.lastRefreshAt = time.Time{} + ccm.Refresh(ctx) + + got := ccm.ClusterCompatVersion() + require.NotNil(t, got) + assert.Equal(t, frozenVersion, *got, "while frozen, reported version should stay at the frozen value") +} + +// TestClusterCompatFreezePreventsHWMAdvance verifies the rollback-preservation contract of +// the freeze: while a freeze is in effect, a subsequent RegisterNodeVersion at a higher +// version (e.g. a node that has been upgraded past the frozen version) must not ratchet +// ClusterCompatVersionHWM past the freeze. If HWM advances, the downgrade gate would later +// refuse rolling that node back to the frozen version — defeating the freeze's purpose. +func TestClusterCompatFreezePreventsHWMAdvance(t *testing.T) { + lowVersion := base.NewClusterCompatVersion(1, 0) + rt := NewRestTester(t, &RestTesterConfig{ + PersistentConfig: true, + nodeClusterCompatVersion: &lowVersion, + }) + defer rt.Close() + RequireStatus(t, rt.CreateDatabase("db", rt.NewDbConfig()), http.StatusCreated) + + ctx := base.TestCtx(t) + bc := rt.ServerContext().BootstrapContext + bucketName := rt.Bucket().GetName() + + ccm := rt.ServerContext().ClusterCompat + require.NotNil(t, ccm) + ccm.Refresh(ctx) + + freeze, err := ccm.Freeze(ctx) + require.NoError(t, err) + require.Equal(t, lowVersion, freeze.Version) + + // Simulate a node upgrade by re-registering self at a higher version. Without the + // freeze ceiling, RegisterNodeVersion would ratchet HWM up to this higher value. + higher := base.NewClusterCompatVersion(2, 0) + _, err = bc.RegisterNodeVersion(ctx, bucketName, rt.ServerContext().NodeUID, "", higher, nil, time.Hour, true) + require.NoError(t, err) + + registry, err := bc.getGatewayRegistry(ctx, bucketName) + require.NoError(t, err) + assert.Equal(t, lowVersion, registry.ClusterCompatVersionHWM, "HWM must not advance past the frozen version") +} + +// TestClusterCompatFreezeBeforeVersion verifies the manager refuses to freeze when no +// cluster compat version has been observed yet (e.g. before RegisterBucket has run). +func TestClusterCompatFreezeBeforeVersion(t *testing.T) { + rt := NewRestTesterPersistentConfig(t) + defer rt.Close() + + // Construct a fresh manager that has never refreshed — its cachedVersion is nil. + freshManager := &clusterCompatManager{sc: rt.ServerContext()} + ctx := base.TestCtx(t) + freshManager.Start(ctx) + defer freshManager.Stop(ctx) + + _, err := freshManager.Freeze(ctx) + assert.ErrorIs(t, err, ErrFreezeNoVersion) +} + +// TestClusterCompatFreezePartialFailure verifies that when a tracked bucket cannot be +// frozen (e.g. its registry doc is unparseable), Freeze returns ErrFreezePartial rather +// than silently succeeding. This makes Freeze success-on-all (mirror of Unfreeze), so the +// admin gets a clear error and can see in the response which buckets did get frozen. +func TestClusterCompatFreezePartialFailure(t *testing.T) { + rt := NewRestTesterPersistentConfig(t) + defer rt.Close() + + ctx := base.TestCtx(t) + ccm := rt.ServerContext().ClusterCompat + require.NotNil(t, ccm) + ccm.Refresh(ctx) + + corruptGatewayRegistry(t, rt, rt.Bucket().GetName()) + + aggregate, err := ccm.Freeze(ctx) + assert.ErrorIs(t, err, ErrFreezePartial) + assert.Nil(t, aggregate, "no bucket accepted the freeze, so the aggregate must be nil") + assert.Nil(t, ccm.getCachedFreeze(), "cache must remain unset when no bucket accepted the freeze") +} + +// TestClusterCompatFreezePartialFailureREST verifies the REST handler returns 503 with a +// ClusterCompatVersionState body when Freeze partially fails — mirroring unfreeze. The +// body must report the live cluster compat version but no frozen version, since the +// failed freeze did not pin any value. +func TestClusterCompatFreezePartialFailureREST(t *testing.T) { + rt := NewRestTesterPersistentConfig(t) + defer rt.Close() + + ctx := base.TestCtx(t) + ccm := rt.ServerContext().ClusterCompat + require.NotNil(t, ccm) + ccm.Refresh(ctx) + + corruptGatewayRegistry(t, rt, rt.Bucket().GetName()) + + resp := rt.SendAdminRequest(http.MethodPost, "/_cluster_compat_version/freeze", "") + RequireStatus(t, resp, http.StatusServiceUnavailable) + var state ClusterCompatVersionState + require.NoError(t, base.JSONUnmarshal(resp.BodyBytes(), &state)) + assert.Nil(t, state.FrozenClusterCompatVersion, "no bucket accepted the freeze, so no frozen version should be reported") + assert.NotNil(t, state.ClusterCompatVersion, "the live cluster compat version should still be reported after a failed freeze") +} + +// TestClusterCompatFreezePreservesCacheOnTotalFailure verifies that if Freeze fails on +// every tracked bucket (succeeded==0), the previously-cached freeze is preserved rather +// than wiped to nil. A transient bucket outage that prevents any SetRegistryFreeze from +// succeeding must not erase a real, persistent freeze from the reporting endpoint — the +// next periodic Refresh self-heals from the authoritative bucket registries. +func TestClusterCompatFreezePreservesCacheOnTotalFailure(t *testing.T) { + rt := NewRestTesterPersistentConfig(t) + defer rt.Close() + + ctx := base.TestCtx(t) + ccm := rt.ServerContext().ClusterCompat + require.NotNil(t, ccm) + ccm.Refresh(ctx) + + preFreeze, err := ccm.Freeze(ctx) + require.NoError(t, err) + require.NotNil(t, preFreeze) + require.NotNil(t, ccm.getCachedFreeze(), "cache must have a freeze record after successful Freeze") + + // Corrupt the registry so subsequent SetRegistryFreeze calls fail at the getRegistry + // step — driving the all-buckets-failed (succeeded==0, aggregate==nil) branch. + corruptGatewayRegistry(t, rt, rt.Bucket().GetName()) + + aggregate, err := ccm.Freeze(ctx) + assert.ErrorIs(t, err, ErrFreezePartial) + assert.Nil(t, aggregate, "no bucket accepted the freeze, so the aggregate must be nil") + + cached := ccm.getCachedFreeze() + require.NotNil(t, cached, "cache must be preserved when no bucket accepted the freeze") + assert.Equal(t, preFreeze.Version, cached.Version, "cache should still reflect the pre-failure freeze") + assert.True(t, cached.FrozenAt.Equal(preFreeze.FrozenAt), "FrozenAt should not have shifted") +} + +// corruptGatewayRegistry overwrites the registry doc on the given bucket with a non-object +// JSON value so that getGatewayRegistry's unmarshal fails. This simulates a bucket whose +// registry has become inaccessible — used to drive the partial-failure branch of Unfreeze. +func corruptGatewayRegistry(t *testing.T, rt *RestTester, bucketName string) { + t.Helper() + ctx := base.TestCtx(t) + conn := rt.ServerContext().BootstrapContext.Connection + var existing map[string]any + cas, err := conn.GetMetadataDocument(ctx, bucketName, base.SGRegistryKey, &existing) + require.NoError(t, err) + _, err = conn.WriteMetadataDocument(ctx, bucketName, base.SGRegistryKey, cas, "corrupted-registry-for-test") + require.NoError(t, err) +} + +// TestClusterCompatUnfreezePartialFailure verifies that when ClearRegistryFreeze fails on +// a tracked bucket AND the residual re-read also fails, Unfreeze returns +// ErrUnfreezePartial with residual==nil. The bucket's registry is overwritten with an +// unparseable value so getGatewayRegistry fails — which propagates through both +// ClearRegistryFreeze and the residual re-read, hitting the clearFailed>0 / residual==nil +// branch. Also verifies the cache is preserved (not wiped to nil) so admins see a stable +// view until Refresh self-heals. +func TestClusterCompatUnfreezePartialFailure(t *testing.T) { + rt := NewRestTesterPersistentConfig(t) + defer rt.Close() + + ctx := base.TestCtx(t) + ccm := rt.ServerContext().ClusterCompat + require.NotNil(t, ccm) + ccm.Refresh(ctx) + + preFreeze, err := ccm.Freeze(ctx) + require.NoError(t, err) + require.NotNil(t, preFreeze) + + corruptGatewayRegistry(t, rt, rt.Bucket().GetName()) + + cleared, residual, err := ccm.Unfreeze(ctx) + assert.ErrorIs(t, err, ErrUnfreezePartial) + require.NotNil(t, cleared, "Unfreeze should return the pre-op freeze record so the handler can surface it") + assert.Equal(t, preFreeze.Version, cleared.Version) + assert.Nil(t, residual, "re-read should fail on the corrupted registry, leaving residual unknown") + + cached := ccm.getCachedFreeze() + require.NotNil(t, cached, "cache must be preserved when residual state could not be verified") + assert.Equal(t, preFreeze.Version, cached.Version, "cache should still reflect the pre-op freeze") +} + +// TestClusterCompatUnfreezePartialFailureREST verifies the REST handler returns 503 with +// an HTTP-Error body (not a ClusterCompatVersionState) when Unfreeze fails and the +// residual state could not be verified. The error reason must include the previously- +// frozen version so the admin has a recovery target. +func TestClusterCompatUnfreezePartialFailureREST(t *testing.T) { + rt := NewRestTesterPersistentConfig(t) + defer rt.Close() + + ctx := base.TestCtx(t) + ccm := rt.ServerContext().ClusterCompat + require.NotNil(t, ccm) + ccm.Refresh(ctx) + + resp := rt.SendAdminRequest(http.MethodPost, "/_cluster_compat_version/freeze", "") + RequireStatus(t, resp, http.StatusOK) + + preFreezeVersion := base.NodeClusterCompatVersion.String() + + corruptGatewayRegistry(t, rt, rt.Bucket().GetName()) + + resp = rt.SendAdminRequest(http.MethodPost, "/_cluster_compat_version/unfreeze", "") + RequireStatus(t, resp, http.StatusServiceUnavailable) + + var httpErr struct { + Error string `json:"error"` + Reason string `json:"reason"` + } + require.NoError(t, base.JSONUnmarshal(resp.BodyBytes(), &httpErr)) + assert.NotEmpty(t, httpErr.Error) + assert.Contains(t, httpErr.Reason, preFreezeVersion, "error reason should name the previously-frozen version so the admin has a recovery target") +} + // TestClusterCompatRefreshIntervalUnclamped verifies refreshInterval returns the configured +// ConfigUpdateFrequency verbatim — no silent floor. The validator is responsible for rejecting +// pathological combinations (see TestStartupConfigNodeHeartbeatExpiryValidation), so the +// runtime must not disagree with what the validator approved. func TestClusterCompatRefreshIntervalUnclamped(t *testing.T) { rt := NewRestTesterPersistentConfig(t) defer rt.Close() diff --git a/rest/config_manager.go b/rest/config_manager.go index 60ae4eb376..53660aca56 100644 --- a/rest/config_manager.go +++ b/rest/config_manager.go @@ -720,8 +720,15 @@ func (b *bootstrapContext) setGatewayRegistry(ctx context.Context, bucketName st // heartbeatExpiry must be positive; the validator enforces this for user-supplied configs and // the runtime fallback ensures a sane default (see clusterCompatManager.heartbeatExpiry). // +// ratchetHWM controls whether this call is allowed to advance ClusterCompatVersionHWM. Pass +// false for the first registration during database load (e.g. the RegisterBucket call from +// _applyConfig) — HWM is monotonic and a ratchet committed off transient startup state can +// never be rolled back. The periodic Refresh passes true only for buckets where at least +// one database has reached DBOnline (see clusterCompatManager.ratchetEligibleBuckets). +// Node heartbeat refresh and stale pruning happen in either case. +// // Uses CAS retry on conflict. Returns the registry as written. -func (b *bootstrapContext) RegisterNodeVersion(ctx context.Context, bucketName, nodeUID, groupID string, version base.ClusterCompatVersion, databases map[string]string, heartbeatExpiry time.Duration) (*GatewayRegistry, error) { +func (b *bootstrapContext) RegisterNodeVersion(ctx context.Context, bucketName, nodeUID, groupID string, version base.ClusterCompatVersion, databases map[string]string, heartbeatExpiry time.Duration, ratchetHWM bool) (*GatewayRegistry, error) { for attempt := 1; attempt <= nodeVersionUpdateMaxRetryAttempts; attempt++ { registry, err := b.getGatewayRegistry(ctx, bucketName) if err != nil { @@ -742,11 +749,24 @@ func (b *bootstrapContext) RegisterNodeVersion(ctx context.Context, bucketName, } // Ratchet ClusterCompatVersionHWM up to the current cluster compat version (min over // all registered nodes). Never decreases — if a lower-version node joins, HWM stays. - ccv := minRegistryNodeClusterCompatVersion(registry.Nodes) - hwmBumped := ccv.GreaterThan(registry.ClusterCompatVersionHWM) - previousHWM := registry.ClusterCompatVersionHWM - if hwmBumped { - registry.ClusterCompatVersionHWM = ccv + // While a freeze is in effect, the freeze version is a ceiling on advancement: HWM + // must not climb past it, otherwise the downgrade gate above would later block rolling + // any node back to the frozen version (the freeze's whole purpose). + // + // Gated on ratchetHWM so startup-window registrations can refresh node heartbeat + // without committing HWM off transient state. + var hwmBumped bool + var previousHWM, ccv base.ClusterCompatVersion + if ratchetHWM { + ccv = minRegistryNodeClusterCompatVersion(registry.Nodes) + if registry.Frozen != nil && ccv.GreaterThan(registry.Frozen.Version) { + ccv = registry.Frozen.Version + } + hwmBumped = ccv.GreaterThan(registry.ClusterCompatVersionHWM) + previousHWM = registry.ClusterCompatVersionHWM + if hwmBumped { + registry.ClusterCompatVersionHWM = ccv + } } err = b.setGatewayRegistry(ctx, bucketName, registry) if err != nil { @@ -806,6 +826,61 @@ func minRegistryNodeClusterCompatVersion(nodes map[string]*base.RegistryNode) ba return base.MinClusterCompatVersion(versions...) } +// SetRegistryFreeze sets the cluster compat version freeze on the given bucket's registry. +// If a freeze is already present, it is left unchanged (idempotent). Uses CAS retry on conflict. +// Returns the freeze record now in effect. +func (b *bootstrapContext) SetRegistryFreeze(ctx context.Context, bucketName string, version base.ClusterCompatVersion) (*base.RegistryFreeze, error) { + for attempt := 1; attempt <= nodeVersionUpdateMaxRetryAttempts; attempt++ { + registry, err := b.getGatewayRegistry(ctx, bucketName) + if err != nil { + return nil, fmt.Errorf("failed to get registry for freeze: %w", err) + } + if registry.Frozen != nil { + return registry.Frozen, nil + } + freeze := &base.RegistryFreeze{ + Version: version, + FrozenAt: time.Now().UTC(), + } + registry.Frozen = freeze + err = b.setGatewayRegistry(ctx, bucketName, registry) + if err != nil { + if base.IsCasMismatch(err) { + base.DebugfCtx(ctx, base.KeyConfig, "CAS mismatch setting registry freeze in bucket %s, retrying (attempt %d/%d)", base.MD(bucketName), attempt, nodeVersionUpdateMaxRetryAttempts) + continue + } + return nil, fmt.Errorf("failed to write registry freeze: %w", err) + } + return freeze, nil + } + return nil, base.RedactErrorf("SetRegistryFreeze failed after %d CAS retry attempts for bucket %s", nodeVersionUpdateMaxRetryAttempts, base.MD(bucketName)) +} + +// ClearRegistryFreeze removes the cluster compat version freeze from the given bucket's registry. +// No-op if no freeze is set. Uses CAS retry on conflict. +func (b *bootstrapContext) ClearRegistryFreeze(ctx context.Context, bucketName string) error { + for attempt := 1; attempt <= nodeVersionUpdateMaxRetryAttempts; attempt++ { + registry, err := b.getGatewayRegistry(ctx, bucketName) + if err != nil { + return fmt.Errorf("failed to get registry for freeze clear: %w", err) + } + if registry.Frozen == nil { + return nil + } + registry.Frozen = nil + err = b.setGatewayRegistry(ctx, bucketName, registry) + if err != nil { + if base.IsCasMismatch(err) { + base.DebugfCtx(ctx, base.KeyConfig, "CAS mismatch clearing registry freeze in bucket %s, retrying (attempt %d/%d)", base.MD(bucketName), attempt, nodeVersionUpdateMaxRetryAttempts) + continue + } + return fmt.Errorf("failed to write registry freeze clear: %w", err) + } + return nil + } + return base.RedactErrorf("ClearRegistryFreeze failed after %d CAS retry attempts for bucket %s", nodeVersionUpdateMaxRetryAttempts, base.MD(bucketName)) +} + // pruneStaleNodes deletes entries from nodes whose HeartbeatAt is older than expiry. The // selfUID entry is always retained — the caller is responsible for refreshing it. Returns // the UIDs of pruned entries (in non-deterministic order). diff --git a/rest/config_registry.go b/rest/config_registry.go index d5c6fd2cc4..d239fe194b 100644 --- a/rest/config_registry.go +++ b/rest/config_registry.go @@ -50,13 +50,14 @@ var ErrNoEligibleAckers = errors.New("no alive nodes in config group to acknowle // } type GatewayRegistry struct { cas uint64 - Version string `json:"version"` // Registry version - ConfigGroups map[string]*RegistryConfigGroup `json:"config_groups"` // Map of config groups, keyed by config group ID - SGVersion base.ComparableBuildVersion `json:"sg_version"` // Latest patch version of Sync Gateway that touched the registry. Diagnostic-only — not used for cluster compat / downgrade decisions. - ClusterCompatVersionHWM base.ClusterCompatVersion `json:"cluster_compat_version_hwm,omitempty"` // High-water mark of cluster compat version (major.minor) seen on this bucket. Ratchets up only — never decreases. - UpdatedAt time.Time `json:"updated_at"` // Time the registry was last updated - CreatedAt time.Time `json:"created_at"` // Time the registry was created - Nodes map[string]*base.RegistryNode `json:"nodes,omitempty"` // Map of node UID to node version registration + Version string `json:"version"` // Registry version + ConfigGroups map[string]*RegistryConfigGroup `json:"config_groups"` // Map of config groups, keyed by config group ID + SGVersion base.ComparableBuildVersion `json:"sg_version"` // Latest patch version of Sync Gateway that touched the registry. Diagnostic-only — not used for cluster compat / downgrade decisions. + ClusterCompatVersionHWM base.ClusterCompatVersion `json:"cluster_compat_version_hwm,omitempty"` // High-water mark of cluster compat version (major.minor) seen on this bucket. Ratchets up only — never decreases. + UpdatedAt time.Time `json:"updated_at"` // Time the registry was last updated + CreatedAt time.Time `json:"created_at"` // Time the registry was created + Nodes map[string]*base.RegistryNode `json:"nodes,omitempty"` // Map of node UID to node version registration + Frozen *base.RegistryFreeze `json:"frozen_cluster_compat_version,omitempty"` // Admin-issued cluster compat version freeze } const GatewayRegistryVersion = "1.0" diff --git a/rest/handler_cluster_compat.go b/rest/handler_cluster_compat.go new file mode 100644 index 0000000000..c876d1d1c4 --- /dev/null +++ b/rest/handler_cluster_compat.go @@ -0,0 +1,141 @@ +// Copyright 2026-Present Couchbase, Inc. +// +// Use of this software is governed by the Business Source License included +// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified +// in that file, in accordance with the Business Source License, use of this +// software will be governed by the Apache License, Version 2.0, included in +// the file licenses/APL2.txt. + +package rest + +import ( + "errors" + "net/http" + "time" + + "github.com/couchbase/sync_gateway/base" +) + +// ClusterCompatVersionState is the response payload for /_cluster_compat_version. It +// describes the cluster-wide cluster compatibility version, the per-node versions +// registered in the cluster, and (only when set) the frozen value pinning it. +type ClusterCompatVersionState struct { + ClusterCompatVersion *base.ClusterCompatVersion `json:"cluster_compat_version,omitempty"` + Nodes map[string]base.ClusterCompatVersion `json:"nodes,omitempty"` + FrozenClusterCompatVersion *base.ClusterCompatVersion `json:"frozen_cluster_compat_version,omitempty"` +} + +// handleGetClusterCompatVersion returns the cluster compatibility version state. +func (h *handler) handleGetClusterCompatVersion() error { + mgr, err := h.requireClusterCompatManager() + if err != nil { + return err + } + state := buildClusterCompatVersionState(mgr) + base.Audit(h.ctx(), base.AuditIDClusterCompatVersionRead, nil) + h.writeJSON(state) + return nil +} + +// handleFreezeClusterCompatVersion captures the current cluster compatibility version and +// pins the cluster to it. Success requires every tracked bucket accept the freeze; on +// partial failure the current state is written as the body of a 503 response so the admin +// can see which version (if any) ended up pinned. +// +// Audits only on full success — partial-failure attempts are still captured by the standard +// admin HTTP API request audit (AuditIDAdminHTTPAPIRequest). +func (h *handler) handleFreezeClusterCompatVersion() error { + mgr, err := h.requireClusterCompatManager() + if err != nil { + return err + } + freeze, err := mgr.Freeze(h.ctx()) + if err != nil { + switch { + case errors.Is(err, ErrFreezeNoVersion): + return base.HTTPErrorf(http.StatusServiceUnavailable, "cluster compatibility version not yet computed; retry once GET /_cluster_compat_version returns a version") + case errors.Is(err, ErrFreezeNoBucketsWritten): + return base.HTTPErrorf(http.StatusServiceUnavailable, "could not freeze cluster compatibility version: no tracked bucket registries") + case errors.Is(err, ErrFreezePartial): + h.writeJSONStatus(http.StatusServiceUnavailable, buildClusterCompatVersionState(mgr)) + return nil + default: + return base.HTTPErrorf(http.StatusInternalServerError, "failed to freeze cluster compatibility version: %v", err) + } + } + base.Audit(h.ctx(), base.AuditIDClusterCompatVersionFreeze, base.AuditFields{ + base.AuditFieldClusterCompatVersion: freeze.Version.String(), + base.AuditFieldFrozenAt: freeze.FrozenAt.Format(time.RFC3339), + }) + state := buildClusterCompatVersionState(mgr) + h.writeJSON(state) + return nil +} + +// handleUnfreezeClusterCompatVersion clears the cluster compatibility version freeze. +// Strict contract: success only if the cluster is fully unfrozen. On partial failure the +// response depends on whether the residual on-disk state could be verified: +// - residual != nil: write the current state as a 503 body so the admin sees which +// buckets are still holding a freeze. +// - residual == nil: the on-disk state is unknown (bucket clear and re-read both +// failed). Return an HTTPError naming the version that was frozen before the attempt +// so the admin has a recovery target. The OpenAPI 503 oneOf already documents this. +// +// Audits only on full success — matching freeze, where the audit records the action +// having taken effect. Partial-failure attempts are still captured by the standard admin +// HTTP API request audit (AuditIDAdminHTTPAPIRequest). +func (h *handler) handleUnfreezeClusterCompatVersion() error { + mgr, err := h.requireClusterCompatManager() + if err != nil { + return err + } + previousFreeze, residual, err := mgr.Unfreeze(h.ctx()) + if err != nil { + if errors.Is(err, ErrUnfreezePartial) { + if residual != nil { + h.writeJSONStatus(http.StatusServiceUnavailable, buildClusterCompatVersionState(mgr)) + return nil + } + prevVersion := "unknown" + if previousFreeze != nil { + prevVersion = previousFreeze.Version.String() + } + return base.HTTPErrorf(http.StatusServiceUnavailable, "unfreeze did not fully apply and the residual cluster compatibility version freeze state could not be verified; cluster was previously frozen at %s — retry once the underlying bucket issue is resolved", prevVersion) + } + return base.HTTPErrorf(http.StatusInternalServerError, "failed to clear cluster compatibility version freeze: %v", err) + } + // Audit fields describe the freeze record that was lifted, not the act of unfreezing. + auditFields := base.AuditFields{} + if previousFreeze != nil { + auditFields[base.AuditFieldClusterCompatVersion] = previousFreeze.Version.String() + auditFields[base.AuditFieldFrozenAt] = previousFreeze.FrozenAt.Format(time.RFC3339) + } + base.Audit(h.ctx(), base.AuditIDClusterCompatVersionUnfreeze, auditFields) + state := buildClusterCompatVersionState(mgr) + h.writeJSON(state) + return nil +} + +// requireClusterCompatManager returns the running clusterCompatManager, or a 503 if none is +// installed (e.g. when running without persistent config and CCV is not initialised). +func (h *handler) requireClusterCompatManager() (*clusterCompatManager, error) { + mgr := h.server.ClusterCompat + if mgr == nil { + return nil, base.HTTPErrorf(http.StatusServiceUnavailable, "cluster compatibility version tracking is not enabled on this node") + } + return mgr, nil +} + +// buildClusterCompatVersionState assembles the response payload from the manager's +// currently-cached state. Per-node timestamps are deliberately not exposed. +func buildClusterCompatVersionState(mgr *clusterCompatManager) ClusterCompatVersionState { + state := ClusterCompatVersionState{ + ClusterCompatVersion: mgr.ClusterCompatVersion(), + Nodes: mgr.NodeVersions(), + } + if freeze := mgr.getCachedFreeze(); freeze != nil { + v := freeze.Version + state.FrozenClusterCompatVersion = &v + } + return state +} diff --git a/rest/routing.go b/rest/routing.go index 973f6d6a3c..9bcf2581ec 100644 --- a/rest/routing.go +++ b/rest/routing.go @@ -271,6 +271,12 @@ func CreateAdminRouter(sc *ServerContext) *mux.Router { r.Handle("/_cluster_info", makeHandlerWithOptions(sc, adminPrivs, []Permission{PermDevOps}, nil, (*handler).handleGetClusterInfo, handlerOptions{sgcollect: true})).Methods("GET") + r.Handle("/_cluster_compat_version", + makeHandlerWithOptions(sc, adminPrivs, []Permission{PermDevOps}, nil, (*handler).handleGetClusterCompatVersion, handlerOptions{sgcollect: true})).Methods("GET") + r.Handle("/_cluster_compat_version/freeze", + makeHandler(sc, adminPrivs, []Permission{PermDevOps}, nil, (*handler).handleFreezeClusterCompatVersion)).Methods("POST") + r.Handle("/_cluster_compat_version/unfreeze", + makeHandler(sc, adminPrivs, []Permission{PermDevOps}, nil, (*handler).handleUnfreezeClusterCompatVersion)).Methods("POST") r.Handle("/_status", makeHandlerWithOptions(sc, adminPrivs, []Permission{PermDevOps}, nil, (*handler).handleGetStatus, handlerOptions{sgcollect: true})).Methods("GET")