Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 79 additions & 49 deletions db/background_mgr_resync_dcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"context"
"errors"
"fmt"
"maps"
"slices"
"sort"
"strings"
"sync"
Expand All @@ -28,12 +30,16 @@ import (
// =====================================================================

type ResyncManagerDCP struct {
docsProcessedLocal atomic.Int64 // number of documents processed locally on this node since the last start or resume of resync
docsChangedLocal atomic.Int64 // number of documents changed locally on this node since the last start or resume of resync
docsProcessedLocalSerialized atomic.Int64 // number of documents processed locally on this node that have been serialized to the status document
docsChangedLocalSerialized atomic.Int64 // number of documents changed locally on this node that have been serialized to the status document
docsProcessedCrossNode atomic.Int64 // number of documents processed across all nodes, as reported by the status document
docsChangedCrossNode atomic.Int64 // number of documents changed across all nodes, as reported by the status document
docsProcessedLocal atomic.Int64 // number of documents processed locally on this node since the last start or resume of resync
docsChangedLocal atomic.Int64 // number of documents changed locally on this node since the last start or resume of resync
docsErroredLocal atomic.Int64 // number of documents that failed to resync locally on this node since the last start or resume of resync
docsProcessedLocalSerialized atomic.Int64 // number of documents processed locally on this node that have been serialized to the status document
docsChangedLocalSerialized atomic.Int64 // number of documents changed locally on this node that have been serialized to the status document
docsErroredLocalSerialized atomic.Int64 // number of documents that failed to resync locally on this node that have been serialized to the status document
docsProcessedCrossNode atomic.Int64 // number of documents processed across all nodes, as reported by the status document
docsChangedCrossNode atomic.Int64 // number of documents changed across all nodes, as reported by the status document
docsErroredCrossNode atomic.Int64 // number of documents that failed to resync across all nodes, as reported by the status document
docsTargeted atomic.Uint64 // number of documents targeted for resync, computed once at the start of a new run
ResyncID string
VBUUIDs []uint64
useXattrs bool
Expand All @@ -45,8 +51,8 @@ type ResyncManagerDCP struct {

// resyncCollectionInfo contains information on collections included on resync run, populated in init() and used in Run()
type resyncCollectionInfo struct {
hasAllCollections bool
collectionIDs []uint32
hasAllCollections bool
}

var _ BackgroundManagerProcessI = &ResyncManagerDCP{}
Expand Down Expand Up @@ -75,15 +81,19 @@ func (r *ResyncManagerDCP) Init(ctx context.Context, options map[string]any, clu
return errors.New("collections option is required and must be of type base.CollectionNames")
}

// Get collectionIds and store in manager for use in DCP client later
collectionIDs, hasAllCollections, collectionNames, err := getCollectionIdsAndNames(db, resyncCollections)
if err != nil {
return err
var collections DatabaseCollections
if len(resyncCollections) > 0 {
var err error
collections, err = db.collections(resyncCollections)
if err != nil {
return err
}
} else {
collections = slices.Collect(maps.Values(db.CollectionByID))
r.hasAllCollections = true
}
r.collectionIDs = collectionIDs
r.hasAllCollections = hasAllCollections
// add collection list to manager for use in status call
r.SetCollectionStatus(collectionNames)
r.setCollectionStatus(collections)

// If the previous run completed, or we couldn't determine, we will start the resync with a new resync ID.
// Otherwise, we should resume with the resync ID, and the previous stats specified in the doc.
Expand All @@ -100,9 +110,7 @@ func (r *ResyncManagerDCP) Init(ctx context.Context, options map[string]any, clu
} else if !base.SlicesEqualIgnoreOrder(r.collectionIDs, statusDoc.CollectionIDs) {
resetMsg = "collection IDs have changed"
} else {
// use the resync ID from the status doc to resume
r.ResyncID = statusDoc.ResyncID
r.SetStatus(statusDoc.DocsChanged, statusDoc.DocsProcessed)
r.initializeFromPreviousStatus(statusDoc)
base.InfofCtx(ctx, base.KeyAll, "Resuming resync with ID: %q", r.ResyncID)
return nil
}
Expand All @@ -118,11 +126,31 @@ func (r *ResyncManagerDCP) Init(ctx context.Context, options map[string]any, clu
if err != nil {
return err
}

docsTargeted, err := totalResyncDocs(ctx, collections)
if err != nil {
base.WarnfCtx(ctx, "Failed to count total documents for resync: %v, continuing resync", err)
}
r.docsTargeted.Store(docsTargeted)

r.ResyncID = newID.String()
base.InfofCtx(ctx, base.KeyAll, "Running new resync process with ID: %q - %s", r.ResyncID, resetMsg)
return nil
}

// totalResyncDocs returns an estimate of the number of documents processed for resync.
func totalResyncDocs(ctx context.Context, collections DatabaseCollections) (uint64, error) {
var total uint64
for _, collection := range collections {
count, err := collection.CountAllDocs(ctx)
if err != nil {
return 0, base.RedactErrorf("failed to count docs for collection %s.%s: %w", base.MD(collection.ScopeName), base.MD(collection.Name), err)
}
total += count
}
return total, nil
}

// purgeCheckpoints removes checkpoints for a given resync run.
func (r *ResyncManagerDCP) purgeCheckpoints(ctx context.Context, db *Database, resyncID string) error {
return base.PurgeDCPCheckpoints(
Expand Down Expand Up @@ -221,6 +249,7 @@ func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]any, pers
db.DbStats.Database().ResyncNumChanged.Add(1)
databaseCollection.collectionStats.ResyncNumChanged.Add(1)
} else if err != base.ErrUpdateCancel {
r.docsErroredLocal.Add(1)
base.WarnfCtx(ctx, "Resync: Error updating doc %q: %v", base.UD(docID), err)
return false
}
Expand Down Expand Up @@ -419,30 +448,6 @@ func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]any, pers
return nil
}

// getCollectionIdsAndNames returns collection names. If no collections are specified, it returns all collections. The
// ids for all collections are returned.
func getCollectionIdsAndNames(db *Database, resyncCollections base.CollectionNames) (collectionIDs []uint32, hasAllCollections bool, collectionNames base.CollectionNames, err error) {
if len(resyncCollections) == 0 {
hasAllCollections = true
for collectionID := range db.CollectionByID {
collectionIDs = append(collectionIDs, collectionID)
}
return collectionIDs, hasAllCollections, db.collectionNames(), nil
}
hasAllCollections = false

for scopeName, collectionsName := range resyncCollections {
for _, collectionName := range collectionsName {
collection, err := db.GetDatabaseCollection(scopeName, collectionName)
if err != nil {
return nil, hasAllCollections, nil, fmt.Errorf("failed to find ID for collection %s.%s", base.MD(scopeName).Redact(), base.MD(collectionName).Redact())
}
collectionIDs = append(collectionIDs, collection.GetCollectionID())
}
}
return collectionIDs, hasAllCollections, resyncCollections, nil
}

func (r *ResyncManagerDCP) ResetStatus() {
r.lock.Lock()
defer r.lock.Unlock()
Expand All @@ -453,20 +458,32 @@ func (r *ResyncManagerDCP) ResetStatus() {
r.docsChangedLocalSerialized.Store(0)
r.docsChangedLocal.Store(0)
r.docsChangedCrossNode.Store(0)
r.docsErroredLocalSerialized.Store(0)
r.docsErroredLocal.Store(0)
r.docsErroredCrossNode.Store(0)
r.docsTargeted.Store(0)
r.ResyncedCollections = nil
}

func (r *ResyncManagerDCP) SetStatus(docChanged, docProcessed int64) {
r.docsChangedLocal.Store(docChanged)
r.docsProcessedLocal.Store(docProcessed)
// initializeFromPreviousStatus restores the in-memory state of the manager from a previously persisted status
// document so that a resumed run starts with the correct accumulated counts.
func (r *ResyncManagerDCP) initializeFromPreviousStatus(statusDoc ResyncManagerStatusDocDCP) {
r.lock.Lock()
defer r.lock.Unlock()
r.ResyncID = statusDoc.ResyncID
r.docsChangedLocal.Store(statusDoc.DocsChanged)
r.docsProcessedLocal.Store(statusDoc.DocsProcessed)
r.docsErroredLocal.Store(statusDoc.DocsErrored)
r.docsTargeted.Store(statusDoc.DocsTargeted)
}

// SetCollectionStatus sets the active collection names being resynced.
func (r *ResyncManagerDCP) SetCollectionStatus(collectionNames base.CollectionNames) {
// setCollectionStatus sets the active collections being resynced.
func (r *ResyncManagerDCP) setCollectionStatus(collections DatabaseCollections) {
r.lock.Lock()
Comment thread
torcolvin marked this conversation as resolved.
defer r.lock.Unlock()

r.ResyncedCollections = collectionNames
r.ResyncedCollections = collections.getNames()
r.collectionIDs = collections.getIDs()
}

// ResyncManagerResponseDCP is the struct used to serialize the status of the resync process. This matches the output
Expand All @@ -480,8 +497,10 @@ type ResyncManagerResponseDCP struct {

// resyncStats is a part of the ResyncManagerResponseDCP struct that only contains the stats fields for efficincy.
type resyncStats struct {
DocsChanged int64 `json:"docs_changed"`
DocsProcessed int64 `json:"docs_processed"`
DocsChanged int64 `json:"docs_changed"`
DocsProcessed int64 `json:"docs_processed"`
DocsErrored int64 `json:"docs_errored"`
DocsTargeted uint64 `json:"docs_targeted"`
}

// SetProcessStatus reports the new status that was serialized to the bucket along with the last status that polled
Expand All @@ -508,8 +527,10 @@ func (r *ResyncManagerDCP) SetProcessStatus(ctx context.Context, previousStatus

r.docsProcessedCrossNode.Store(newStats.DocsProcessed)
r.docsChangedCrossNode.Store(newStats.DocsChanged)
r.docsErroredCrossNode.Store(newStats.DocsErrored)
r.docsProcessedLocalSerialized.Add(newStats.DocsProcessed - previousStats.DocsProcessed)
r.docsChangedLocalSerialized.Add(newStats.DocsChanged - previousStats.DocsChanged)
r.docsErroredLocalSerialized.Add(newStats.DocsErrored - previousStats.DocsErrored)
}

func (r *ResyncManagerDCP) GetProcessStatus(status BackgroundManagerStatus, previousStatus []byte) ([]byte, []byte, error) {
Expand All @@ -530,6 +551,8 @@ func (r *ResyncManagerDCP) GetProcessStatus(status BackgroundManagerStatus, prev
resyncStats: resyncStats{
DocsChanged: previousStats.DocsChanged + (r.docsChangedLocal.Load() - r.docsChangedLocalSerialized.Load()),
DocsProcessed: previousStats.DocsProcessed + (r.docsProcessedLocal.Load() - r.docsProcessedLocalSerialized.Load()),
DocsErrored: previousStats.DocsErrored + (r.docsErroredLocal.Load() - r.docsErroredLocalSerialized.Load()),
DocsTargeted: r.docsTargeted.Load(),
},
CollectionsProcessing: r.ResyncedCollections,
}
Expand All @@ -538,6 +561,7 @@ func (r *ResyncManagerDCP) GetProcessStatus(status BackgroundManagerStatus, prev
if len(previousStatus) == 0 {
response.DocsChanged = r.DocsChanged()
response.DocsProcessed = r.DocsProcessed()
response.DocsErrored = r.DocsErrored()
}

meta := ResyncManagerMeta{
Expand Down Expand Up @@ -569,6 +593,12 @@ func (r *ResyncManagerDCP) DocsProcessed() int64 {
return r.docsProcessedCrossNode.Load() + r.docsProcessedLocal.Load() - r.docsProcessedLocalSerialized.Load()
}

// DocsErrored returns the total number of documents that failed to resync across the entire resync process.
// This includes docs errored by other nodes.
func (r *ResyncManagerDCP) DocsErrored() int64 {
return r.docsErroredCrossNode.Load() + r.docsErroredLocal.Load() - r.docsErroredLocalSerialized.Load()
}

type ResyncManagerMeta struct {
VBUUIDs []uint64 `json:"vbuuids"`
CollectionIDs []uint32 `json:"collection_ids,omitempty"`
Expand Down
45 changes: 30 additions & 15 deletions db/background_mgr_resync_dcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,17 @@ import (

func TestResyncDCPInit(t *testing.T) {
testCases := []struct {
title string
initialClusterState ResyncManagerStatusDocDCP
forceReset bool
shouldCreateNewRun bool
title string
initialClusterState ResyncManagerStatusDocDCP
forceReset bool
shouldCreateNewRun bool
expectedDocsTargeted uint64
}{
{
title: "Initialize new run with empty cluster state",
forceReset: false,
shouldCreateNewRun: true,
title: "Initialize new run with empty cluster state",
forceReset: false,
shouldCreateNewRun: true,
expectedDocsTargeted: 0, // no docs in DB
},
{
title: "Reinitialize existing run",
Expand All @@ -50,14 +52,16 @@ func TestResyncDCPInit(t *testing.T) {
resyncStats: resyncStats{
DocsChanged: 10,
DocsProcessed: 20,
DocsTargeted: 50,
},
},
ResyncManagerMeta: ResyncManagerMeta{
VBUUIDs: []uint64{1},
},
},
forceReset: false,
shouldCreateNewRun: false,
forceReset: false,
shouldCreateNewRun: false,
expectedDocsTargeted: 50, // restored from persisted meta
},
{
title: "Restart existing run new Collection",
Expand All @@ -77,8 +81,9 @@ func TestResyncDCPInit(t *testing.T) {
CollectionIDs: []uint32{123},
},
},
forceReset: false,
shouldCreateNewRun: true,
forceReset: false,
shouldCreateNewRun: true,
expectedDocsTargeted: 0, // no docs in DB
},
{
title: "Reinitialize completed run",
Expand All @@ -97,8 +102,9 @@ func TestResyncDCPInit(t *testing.T) {
VBUUIDs: []uint64{1},
},
},
forceReset: false,
shouldCreateNewRun: true,
forceReset: false,
shouldCreateNewRun: true,
expectedDocsTargeted: 0, // no docs in DB
},
{
title: "Force restart existing run",
Expand All @@ -117,8 +123,9 @@ func TestResyncDCPInit(t *testing.T) {
VBUUIDs: []uint64{1},
},
},
forceReset: true,
shouldCreateNewRun: true,
forceReset: true,
shouldCreateNewRun: true,
expectedDocsTargeted: 0, // no docs in DB
},
}

Expand Down Expand Up @@ -172,6 +179,7 @@ func TestResyncDCPInit(t *testing.T) {
assert.Equal(t, testCase.initialClusterState.DocsChanged, response.DocsChanged)
assert.Equal(t, testCase.initialClusterState.DocsProcessed, response.DocsProcessed)
}
assert.Equal(t, testCase.expectedDocsTargeted, response.DocsTargeted)

assert.Equal(t, int64(0), db.DbStats.Database().ResyncNumChanged.Value())
assert.Equal(t, int64(0), db.DbStats.Database().ResyncNumProcessed.Value())
Expand Down Expand Up @@ -256,6 +264,7 @@ func TestResyncManagerDCPStart(t *testing.T) {

assert.GreaterOrEqual(t, stats.DocsProcessed, int64(docsToCreate)) // may be processing tombstones from previous tests
assert.Equal(t, int64(0), stats.DocsChanged)
assert.GreaterOrEqual(t, stats.DocsTargeted, uint64(docsToCreate))

assert.GreaterOrEqual(t, db.DbStats.Database().ResyncNumProcessed.Value(), int64(docsToCreate))
assert.Equal(t, db.DbStats.Database().ResyncNumChanged.Value(), int64(0))
Expand Down Expand Up @@ -312,6 +321,7 @@ func TestResyncManagerDCPStart(t *testing.T) {
// be greater than DocsChanged
assert.GreaterOrEqual(t, stats.DocsProcessed, int64(docsToCreate))
assert.Equal(t, int64(docsToCreate), stats.DocsChanged)
assert.GreaterOrEqual(t, stats.DocsTargeted, uint64(docsToCreate))

assert.GreaterOrEqual(t, db.DbStats.Database().ResyncNumProcessed.Value(), int64(docsToCreate))
assert.Equal(t, db.DbStats.Database().ResyncNumChanged.Value(), int64(docsToCreate))
Expand Down Expand Up @@ -421,6 +431,9 @@ func TestResyncManagerDCPResumeStoppedProcess(t *testing.T) {

require.Less(t, stats.DocsProcessed, int64(docsToCreate), "DocsProcessed is equal to docs created. Consider setting docsToCreate > %d.", docsToCreate)
assert.Less(t, stats.DocsChanged, int64(docsToCreate))
// DocsTargeted is computed once at run start and should be >= docsToCreate
assert.GreaterOrEqual(t, stats.DocsTargeted, uint64(docsToCreate))
initialDocsTargeted := stats.DocsTargeted

assert.Less(t, db.DbStats.Database().ResyncNumProcessed.Value(), int64(docsToCreate))
assert.Less(t, db.DbStats.Database().ResyncNumChanged.Value(), int64(docsToCreate))
Expand All @@ -433,6 +446,8 @@ func TestResyncManagerDCPResumeStoppedProcess(t *testing.T) {

assert.GreaterOrEqual(t, stats.DocsProcessed, int64(docsToCreate))
assert.Equal(t, int64(docsToCreate), stats.DocsChanged)
// DocsTargeted is preserved from the original run start, even after resume
assert.Equal(t, initialDocsTargeted, stats.DocsTargeted)

assert.GreaterOrEqual(t, db.DbStats.Database().ResyncNumProcessed.Value(), int64(docsToCreate))
assert.Equal(t, int64(docsToCreate), db.DbStats.Database().ResyncNumChanged.Value())
Expand Down
16 changes: 16 additions & 0 deletions db/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -2026,6 +2026,7 @@ func initDatabaseStats(ctx context.Context, dbName string, autoImport bool, opti
QueryTypeTombstones,
QueryTypeResync,
QueryTypeAllDocs,
QueryTypeCountDocs,
QueryTypeUsers,
}
}
Expand Down Expand Up @@ -2614,3 +2615,18 @@ func (db *DatabaseContext) InitializeOfflineMode() {
db.DBStateManager.SetResyncFunc(TempResyncHandler)
db.DBStateManager.StartPolling(db.CancelContext)
}

// colletions returns the DatabaseCollection objects matching the following collection names.
func (db *DatabaseContext) collections(names base.CollectionNames) (DatabaseCollections, error) {
collections := make(DatabaseCollections, 0, len(names))
for scopeName, collectionsName := range names {
for _, collectionName := range collectionsName {
collection, err := db.GetDatabaseCollection(scopeName, collectionName)
if err != nil {
return nil, base.RedactErrorf("failed to find ID for collection %s.%s", base.MD(scopeName).Redact(), base.MD(collectionName).Redact())
}
collections = append(collections, collection)
}
}
return collections, nil
}
Loading
Loading