Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion base/constants_syncdocs.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ func marshalSyncInfo(syncInfo *SyncInfo, clusterCompatVersion *ClusterCompatVers
if err != nil {
return nil, fmt.Errorf("Error marshalling syncInfo: %v", err)
}
if clusterCompatVersion != nil && clusterCompatVersion.AtLeast(4, 1) {
if clusterCompatVersion.AtLeast(4, 1) {
payload = append([]byte{byte(SyncInfoTypeV1)}, payload...)
}
return payload, nil
Expand Down
7 changes: 6 additions & 1 deletion base/version_cluster_compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,12 @@ func NewClusterCompatVersion(major, minor uint8) ClusterCompatVersion {
}

// AtLeast returns true if this version is greater than or equal to the given major.minor.
func (v ClusterCompatVersion) AtLeast(major, minor uint8) bool {
// A nil receiver returns false, representing an unknown cluster compat version that should not
// gate any version-dependent behavior on.
func (v *ClusterCompatVersion) AtLeast(major, minor uint8) bool {
if v == nil {
return false
}
if v.Major != major {
return v.Major > major
}
Expand Down
6 changes: 1 addition & 5 deletions db/background_mgr_attachment_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,11 +197,7 @@ func (a *AttachmentMigrationManager) Run(ctx context.Context, options map[string
// set sync info metadata version
for _, collectionID := range currCollectionIDs {
dbc := db.CollectionByID[collectionID]
var ccv *base.ClusterCompatVersion
if a.databaseCtx.Options.ClusterCompatVersion != nil {
ccv = a.databaseCtx.Options.ClusterCompatVersion()
}
if err := base.SetSyncInfoMetaVersion(ctx, dbc.dataStore, MetaVersionValue, ccv); err != nil {
if err := base.SetSyncInfoMetaVersion(ctx, dbc.dataStore, MetaVersionValue, a.databaseCtx.ClusterCompatVersion()); err != nil {
base.WarnfCtx(ctx, "[%s] Completed attachment migration, but unable to update syncInfo for collection %s: %v", migrationLoggingID, dbc.Name, err)
return err
}
Expand Down
4 changes: 2 additions & 2 deletions db/background_mgr_attachment_migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,15 +313,15 @@ func TestAttachmentMigrationCheckpointPrefix(t *testing.T) {
}

// TestAttachmentMigrationWritesV1SyncInfoAtCcv41 verifies the end-to-end wiring from
// DatabaseContextOptions.ClusterCompatVersion through AttachmentMigrationManager.Run into
// DatabaseContext.ClusterCompatVersion through AttachmentMigrationManager.Run into
// base.SetSyncInfoMetaVersion — when ccv>=4.1 the syncInfo doc is written with the V1
// version-byte prefix.
func TestAttachmentMigrationWritesV1SyncInfoAtCcv41(t *testing.T) {
db, ctx := setupTestDB(t)
defer db.Close(ctx)

ccv := base.NewClusterCompatVersion(4, 1)
db.Options.ClusterCompatVersion = func() *base.ClusterCompatVersion { return &ccv }
db.ClusterCompatVersionFunc = func() *base.ClusterCompatVersion { return &ccv }

collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db)

Expand Down
127 changes: 127 additions & 0 deletions db/background_mgr_metadata_migration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
Copyright 2026-Present Couchbase, Inc.

Use of this software is governed by the Business Source License included in
the file licenses/BSL-Couchbase.txt. As of the Change Date specified in that
file, in accordance with the Business Source License, use of this software will
be governed by the Apache License, Version 2.0, included in the file
licenses/APL2.txt.
*/

package db

import (
"context"
"sync"
"sync/atomic"

"github.com/couchbase/sync_gateway/base"
"github.com/google/uuid"
)

type MetadataMigrationManager struct {
docsProcessed atomic.Int64
docsFailed atomic.Int64
MigrationID string
dbContext *DatabaseContext
lock sync.RWMutex
}

const MetadataMigrationManagerName = "metadata_migration"

var _ BackgroundManagerProcessI = &MetadataMigrationManager{}

func NewMetadataMigrationManager(dbContext *DatabaseContext) *BackgroundManager {
return &BackgroundManager{
name: MetadataMigrationManagerName,
Process: &MetadataMigrationManager{dbContext: dbContext},
clusterAwareOptions: &ClusterAwareBackgroundManagerOptions{
metadataStore: dbContext.MetadataStore,
metaKeys: dbContext.MetadataKeys,
processSuffix: MetadataMigrationManagerName,
},
terminator: base.NewSafeTerminator(),
}
}

type MigrationManagerResponse struct {
BackgroundManagerStatus
DocsProcessed int64 `json:"docs_processed"`
DocsFailed int64 `json:"docs_failed"`
MigrationID string `json:"migration_id"`
}

type MigrationManagerStatusDoc struct {
MigrationManagerResponse `json:"status"`
}

func (m *MetadataMigrationManager) Init(ctx context.Context, options map[string]any, clusterStatus []byte) error {
newRunInit := func() error {
uniqueUUID, err := uuid.NewRandom()
if err != nil {
return err
}

m.MigrationID = uniqueUUID.String()
base.InfofCtx(ctx, base.KeyAll, "Metadata Migration: Starting new migration run with migration ID: %s", m.MigrationID)
return nil
}

if clusterStatus != nil {
var status MigrationManagerStatusDoc
err := base.JSONUnmarshal(clusterStatus, &status)
// If the previous run completed, or there was an error during unmarshalling the status start again
if status.State == BackgroundProcessStateCompleted || err != nil {
return newRunInit()
}
m.docsProcessed.Store(status.DocsProcessed)
m.docsFailed.Store(status.DocsFailed)
m.MigrationID = status.MigrationID
base.InfofCtx(ctx, base.KeyAll, "Metadata Migration: Resuming migration run with migration ID: %s, docs processed: %d, docs failed: %d", m.MigrationID, status.DocsProcessed, status.DocsFailed)
return nil
}
return newRunInit()
}

func (m *MetadataMigrationManager) Run(ctx context.Context, options map[string]any, persistClusterStatusCallback updateStatusCallbackFunc, terminator *base.SafeTerminator) error {
metadataMigrationLoggingID := "Metadata Migration: " + m.MigrationID

persistClusterStatus := func() {
err := persistClusterStatusCallback(ctx)
if err != nil {
base.WarnfCtx(ctx, "[%s] Failed to persist latest cluster status for metadata migration: %v", metadataMigrationLoggingID, err)
}
}
defer persistClusterStatus()

base.InfofCtx(ctx, base.KeyAll, "[%s] Metadata Migration TODO: CBG-5228", metadataMigrationLoggingID)
return nil
}

func (m *MetadataMigrationManager) ResetStatus() {
m.lock.Lock()
defer m.lock.Unlock()
m.docsProcessed.Store(0)
m.docsFailed.Store(0)
m.MigrationID = ""
}

func (m *MetadataMigrationManager) SetProcessStatus(ctx context.Context, previousStatus []byte, newStatus []byte) {
// no-op
}

func (m *MetadataMigrationManager) GetProcessStatus(status BackgroundManagerStatus, previousStatus []byte) (statusOut []byte, meta []byte, err error) {
m.lock.RLock()
defer m.lock.RUnlock()
resp := MigrationManagerResponse{
BackgroundManagerStatus: status,
DocsProcessed: m.docsProcessed.Load(),
DocsFailed: m.docsFailed.Load(),
MigrationID: m.MigrationID,
}
statusOut, err = base.JSONMarshal(resp)
if err != nil {
return nil, nil, err
}
return statusOut, nil, nil
}
154 changes: 154 additions & 0 deletions db/background_mgr_metadata_migration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
Copyright 2026-Present Couchbase, Inc.

Use of this software is governed by the Business Source License included in
the file licenses/BSL-Couchbase.txt. As of the Change Date specified in that
file, in accordance with the Business Source License, use of this software will
be governed by the Apache License, Version 2.0, included in the file
licenses/APL2.txt.
*/

package db

import (
"context"
"testing"
"time"

sgbucket "github.com/couchbase/sg-bucket"
"github.com/couchbase/sync_gateway/base"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// TestShouldRunMetadataMigration is a truth table covering the guard that decides whether
// metadata migration may be armed: it requires both the UseSystemMetadataCollection opt-in
// and a cluster compatibility version of at least 4.1.
func TestShouldRunMetadataMigration(t *testing.T) {
compatVersion := func(major, minor uint8) func() *base.ClusterCompatVersion {
return func() *base.ClusterCompatVersion {
v := base.NewClusterCompatVersion(major, minor)
return &v
}
}
nilCompatVersion := func() *base.ClusterCompatVersion { return nil }

testCases := []struct {
name string
useSystemMeta bool
compatVersion func() *base.ClusterCompatVersion
expectedResult bool
}{
{name: "opt-out, no compat version", useSystemMeta: false, compatVersion: nil, expectedResult: false},
{name: "opt-out, compat 4.1", useSystemMeta: false, compatVersion: compatVersion(4, 1), expectedResult: false},
{name: "opt-in, nil compat func", useSystemMeta: true, compatVersion: nil, expectedResult: false},
{name: "opt-in, compat func returns nil", useSystemMeta: true, compatVersion: nilCompatVersion, expectedResult: false},
{name: "opt-in, compat 3.0", useSystemMeta: true, compatVersion: compatVersion(3, 0), expectedResult: false},
{name: "opt-in, compat 4.0", useSystemMeta: true, compatVersion: compatVersion(4, 0), expectedResult: false},
{name: "opt-in, compat 4.1", useSystemMeta: true, compatVersion: compatVersion(4, 1), expectedResult: true},
{name: "opt-in, compat 4.2", useSystemMeta: true, compatVersion: compatVersion(4, 2), expectedResult: true},
{name: "opt-in, compat 5.0", useSystemMeta: true, compatVersion: compatVersion(5, 0), expectedResult: true},
}

for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
dbCtx := &DatabaseContext{
Options: DatabaseContextOptions{
UseSystemMetadataCollection: test.useSystemMeta,
},
ClusterCompatVersionFunc: test.compatVersion,
}
assert.Equal(t, test.expectedResult, dbCtx.shouldRunMetadataMigration())
})
}
}

// TestArmMetadataMigrationTaskNoCallback verifies that armMetadataMigrationTask returns
// immediately (rather than blocking on its poll ticker) when no ConfigFullyAppliedFunc has
// been set, so a misconfigured database cannot leak a goroutine waiting forever.
func TestArmMetadataMigrationTaskNoCallback(t *testing.T) {
dbCtx := &DatabaseContext{
Name: "test",
terminator: make(chan bool),
}
// ConfigFullyAppliedFunc is intentionally left nil.

done := make(chan struct{})
go func() {
dbCtx.armMetadataMigrationTask(context.Background())
close(done)
}()

select {
case <-done:
case <-time.After(5 * time.Second):
assert.Fail(t, "armMetadataMigrationTask did not return when ConfigFullyAppliedFunc was nil")
}
}

// TestTryStartMetadataMigrationAlreadyRunning verifies that when the migration is already running on
// another node (the heartbeat lock doc exists), tryStartMetadataMigration reports done=true so the
// arm loop stops rather than re-acquiring the lock once that node completes - which would re-run the
// migration once per node in the cluster.
func TestTryStartMetadataMigrationAlreadyRunning(t *testing.T) {
ctx := base.TestCtx(t)
testBucket := base.GetTestBucket(t)
defer testBucket.Close(ctx)

metadataStore := testBucket.DefaultDataStore(ctx)
metaKeys := base.NewMetadataKeys("test-already-running")

dbCtx := &DatabaseContext{
Name: "test",
terminator: make(chan bool),
MetadataStore: metadataStore,
MetadataKeys: metaKeys,
}
dbCtx.MetadataMigrationManager = NewMetadataMigrationManager(dbCtx)

// Simulate another node holding the migration heartbeat lock, so Start returns
// errBackgroundManagerProcessAlreadyRunning.
heartbeatDocID := dbCtx.MetadataMigrationManager.clusterAwareOptions.HeartbeatDocID()
_, err := metadataStore.WriteCas(ctx, heartbeatDocID, BackgroundManagerHeartbeatExpirySecs, 0, []byte("{}"), sgbucket.Raw)
require.NoError(t, err)

// Config is fully applied so the attempt proceeds to Start.
dbCtx.ConfigFullyAppliedFunc = func(ctx context.Context) (bool, []string, error) {
return true, nil, nil
}

// done=true tells the arm loop to stop polling rather than retry and re-run later.
require.True(t, dbCtx.tryStartMetadataMigration(ctx))

// The local manager must not have started: it should still be in its initial (uninitialized)
// run state, having backed off rather than acquiring the lock.
assert.NotEqual(t, BackgroundProcessStateRunning, dbCtx.MetadataMigrationManager.GetRunState())

// The other node's heartbeat lock must remain untouched.
_, _, err = metadataStore.GetRaw(ctx, heartbeatDocID)
assert.NoError(t, err, "heartbeat lock doc should still exist - attempt must not have taken it")
}

// TestTryStartMetadataMigrationConfigNotApplied verifies that while the cluster has not yet
// converged on the config, tryStartMetadataMigration reports done=false so the arm loop keeps
// polling, and does not start the migration.
func TestTryStartMetadataMigrationConfigNotApplied(t *testing.T) {
ctx := base.TestCtx(t)
testBucket := base.GetTestBucket(t)
defer testBucket.Close(ctx)

dbCtx := &DatabaseContext{
Name: "test",
terminator: make(chan bool),
MetadataStore: testBucket.DefaultDataStore(ctx),
MetadataKeys: base.NewMetadataKeys("test-not-applied"),
}
dbCtx.MetadataMigrationManager = NewMetadataMigrationManager(dbCtx)

dbCtx.ConfigFullyAppliedFunc = func(ctx context.Context) (bool, []string, error) {
return false, []string{"node2"}, nil
}

assert.False(t, dbCtx.tryStartMetadataMigration(ctx))
assert.NotEqual(t, BackgroundProcessStateRunning, dbCtx.MetadataMigrationManager.GetRunState())
}
6 changes: 1 addition & 5 deletions db/background_mgr_resync_dcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,11 +407,7 @@ func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]any, pers
if !ok {
base.WarnfCtx(ctx, "Completed resync, but unable to update syncInfo for collection %v (not found)", collectionID)
}
var ccv *base.ClusterCompatVersion
if db.Options.ClusterCompatVersion != nil {
ccv = db.Options.ClusterCompatVersion()
}
if err := base.SetSyncInfoMetadataID(ctx, dbc.dataStore, db.DatabaseContext.Options.MetadataID, ccv); err != nil {
if err := base.SetSyncInfoMetadataID(ctx, dbc.dataStore, db.DatabaseContext.Options.MetadataID, db.ClusterCompatVersion()); err != nil {
base.WarnfCtx(ctx, "Completed resync, but unable to update syncInfo for collection %v: %v", collectionID, err)
}
updatedDsNames[base.ScopeAndCollectionName{Scope: dbc.ScopeName, Collection: dbc.Name}] = struct{}{}
Expand Down
4 changes: 2 additions & 2 deletions db/background_mgr_resync_dcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -939,7 +939,7 @@ func resyncTestModes() []resyncTestCase {
}

// TestResyncManagerDCPWritesV1SyncInfoAtCcv41 verifies the end-to-end wiring from
// DatabaseContextOptions.ClusterCompatVersion through ResyncManagerDCP.Run into
// DatabaseContext.ClusterCompatVersion through ResyncManagerDCP.Run into
// base.SetSyncInfoMetadataID — when ccv>=4.1 the syncInfo doc is written with the V1
// version-byte prefix. regenerateSequences=true is required so the syncInfo update path runs.
func TestResyncManagerDCPWritesV1SyncInfoAtCcv41(t *testing.T) {
Expand All @@ -952,7 +952,7 @@ func TestResyncManagerDCPWritesV1SyncInfoAtCcv41(t *testing.T) {
defer db.Close(ctx)

ccv := base.NewClusterCompatVersion(4, 1)
db.Options.ClusterCompatVersion = func() *base.ClusterCompatVersion { return &ccv }
db.ClusterCompatVersionFunc = func() *base.ClusterCompatVersion { return &ccv }
// Resync only writes syncInfo when MetadataID is non-empty (SetSyncInfoMetadataID is a no-op otherwise).
db.Options.MetadataID = "test_resync_md_id"

Expand Down
Loading
Loading