Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/sync/normalize_worker_count_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ func TestNormalizeWorkerCount(t *testing.T) {
want int
}{
{"minus_one_auto", -1, auto},
{"zero_sequential", 0, 0},
{"zero_sequential", 0, 1},
{"positive", 7, 7},
{"below_minus_one_sequential", -99, 0},
{"below_minus_one_sequential", -99, 1},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
222 changes: 0 additions & 222 deletions pkg/sync/parallel_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,228 +13,6 @@ import (
"go.uber.org/zap"
)

func (s *syncer) sequentialSync(
ctx context.Context,
runCtx context.Context,
targetedResources []*v2.Resource,
) ([]error, error) {
l := ctxzap.Extract(ctx)

retryer := retry.NewRetryer(ctx, retry.RetryConfig{
MaxAttempts: 0,
InitialDelay: 1 * time.Second,
MaxDelay: 0,
})

var warnings []error
for s.state.Current() != nil {
err := s.Checkpoint(ctx, false)
if err != nil {
return warnings, err
}

// If we have more than 10 warnings and more than 10% of actions ended in a warning, exit the sync.
if len(warnings) > 10 {
completedActionsCount := s.state.GetCompletedActionsCount()
if completedActionsCount > 0 && float64(len(warnings))/float64(completedActionsCount) > 0.1 {
return warnings, fmt.Errorf("%w: warnings: %v completed actions: %d", ErrTooManyWarnings, warnings, completedActionsCount)
}
}
select {
case <-runCtx.Done():
err = context.Cause(runCtx)
switch {
case errors.Is(err, context.DeadlineExceeded):
l.Info("sync run duration has expired, exiting sync early", zap.String("sync_id", s.syncID))
// It would be nice to remove this once we're more confident in the checkpointing logic.
checkpointErr := s.Checkpoint(ctx, true)
if checkpointErr != nil {
l.Error("error checkpointing before exiting sync", zap.Error(checkpointErr))
}
return warnings, errors.Join(checkpointErr, ErrSyncNotComplete)
default:
l.Error("sync context cancelled", zap.String("sync_id", s.syncID), zap.Error(err))
return warnings, err
}
default:
}

stateAction := s.state.Current()

switch stateAction.Op {
case InitOp:
s.state.FinishAction(ctx, stateAction)

if s.skipEntitlementsAndGrants {
s.state.SetShouldSkipEntitlementsAndGrants()
}
if s.skipGrants {
s.state.SetShouldSkipGrants()
}
if len(targetedResources) > 0 {
for _, r := range targetedResources {
s.state.PushAction(ctx, Action{
Op: SyncTargetedResourceOp,
ResourceID: r.GetId().GetResource(),
ResourceTypeID: r.GetId().GetResourceType(),
ParentResourceID: r.GetParentResourceId().GetResource(),
ParentResourceTypeID: r.GetParentResourceId().GetResourceType(),
})
}
s.state.SetShouldFetchRelatedResources()
s.state.PushAction(ctx, Action{Op: SyncResourceTypesOp})
err = s.Checkpoint(ctx, true)
if err != nil {
return warnings, err
}
// Don't do grant expansion or external resources in partial syncs, as we likely lack related resources/entitlements/grants
continue
}

// FIXME(jirwin): Disabling syncing assets for now
// s.state.PushAction(ctx, Action{Op: SyncAssetsOp})
if !s.state.ShouldSkipEntitlementsAndGrants() {
s.state.PushAction(ctx, Action{Op: SyncGrantExpansionOp})
}
if s.externalResourceReader != nil {
s.state.PushAction(ctx, Action{Op: SyncExternalResourcesOp})
}
if s.onlyExpandGrants {
s.state.SetNeedsExpansion()
err = s.Checkpoint(ctx, true)
if err != nil {
return warnings, err
}
continue
}
if !s.state.ShouldSkipEntitlementsAndGrants() {
if !s.state.ShouldSkipGrants() {
s.state.PushAction(ctx, Action{Op: SyncGrantsOp})
}

s.state.PushAction(ctx, Action{Op: SyncEntitlementsOp})

s.state.PushAction(ctx, Action{Op: SyncStaticEntitlementsOp})
}
s.state.PushAction(ctx, Action{Op: SyncResourcesOp})
s.state.PushAction(ctx, Action{Op: SyncResourceTypesOp})

err = s.Checkpoint(ctx, true)
if err != nil {
return warnings, err
}
continue

case SyncResourceTypesOp:
err = s.SyncResourceTypes(ctx, stateAction)
if !retryer.ShouldWaitAndRetry(ctx, err) {
return warnings, err
}
continue

case SyncResourcesOp:
err = s.SyncResources(ctx, stateAction)
if !retryer.ShouldWaitAndRetry(ctx, err) {
return warnings, err
}
continue

case SyncTargetedResourceOp:
err = s.SyncTargetedResource(ctx, stateAction)
if isWarning(ctx, err) {
l.Warn("skipping sync targeted resource action", zap.Any("stateAction", stateAction), zap.Error(err))
warnings = append(warnings, err)
s.state.FinishAction(ctx, stateAction)
continue
}
if !retryer.ShouldWaitAndRetry(ctx, err) {
return warnings, err
}
continue

case SyncStaticEntitlementsOp:
err = s.SyncStaticEntitlements(ctx, stateAction)
if isWarning(ctx, err) {
l.Warn("skipping sync static entitlements action", zap.Any("stateAction", stateAction), zap.Error(err))
warnings = append(warnings, err)
s.state.FinishAction(ctx, stateAction)
continue
}
if !retryer.ShouldWaitAndRetry(ctx, err) {
return warnings, err
}
continue
case SyncEntitlementsOp:
err = s.SyncEntitlements(ctx, stateAction)
if isWarning(ctx, err) {
l.Warn("skipping sync entitlement action", zap.Any("stateAction", stateAction), zap.Error(err))
warnings = append(warnings, err)
s.state.FinishAction(ctx, stateAction)
continue
}
if !retryer.ShouldWaitAndRetry(ctx, err) {
return warnings, err
}
continue

case SyncGrantsOp:
err = s.SyncGrants(ctx, stateAction)
if isWarning(ctx, err) {
l.Warn("skipping sync grant action", zap.Any("stateAction", stateAction), zap.Error(err))
warnings = append(warnings, err)
s.state.FinishAction(ctx, stateAction)
continue
}
if !retryer.ShouldWaitAndRetry(ctx, err) {
return warnings, err
}
continue

case SyncExternalResourcesOp:
err = s.SyncExternalResources(ctx, stateAction)
if !retryer.ShouldWaitAndRetry(ctx, err) {
return warnings, err
}
continue
case SyncAssetsOp:
err = s.SyncAssets(ctx, stateAction)
if !retryer.ShouldWaitAndRetry(ctx, err) {
return warnings, err
}
continue

case SyncGrantExpansionOp:
// Mark the sync as supporting diff, but only if we're starting fresh.
// If we're resuming (graph has edges or a page token), we may be continuing
// from old code that didn't have this marker, so we must not set it.
entitlementGraph := s.state.EntitlementGraph(ctx)
isResumingExpansion := entitlementGraph.Loaded || len(entitlementGraph.Edges) > 0 || stateAction.PageToken != ""
if !isResumingExpansion {
if err := s.store.SyncMeta().MarkSyncSupportsDiff(ctx, s.syncID); err != nil {
l.Error("failed to set supports_diff marker", zap.Error(err))
return warnings, err
}
}

if s.dontExpandGrants || !s.state.NeedsExpansion() {
l.Debug("skipping grant expansion, no grants to expand")
s.state.FinishAction(ctx, stateAction)
continue
}

err = s.SyncGrantExpansion(ctx, stateAction)
if !retryer.ShouldWaitAndRetry(ctx, err) {
return warnings, err
}
continue
default:
return warnings, fmt.Errorf("unexpected sync step")
}
}
return warnings, nil
}

// TODO: Merge parallelSync and sequentialSync once we're confident in parallelSync's behavior.
func (s *syncer) parallelSync(
ctx context.Context,
runCtx context.Context,
Expand Down
30 changes: 2 additions & 28 deletions pkg/sync/progresslog/progresslog.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,29 +31,14 @@ const (
metricDecompressedBytesDelta = "baton.sync.expand.decompressed_bytes_delta"
)

type rwMutex interface {
Lock()
Unlock()
RLock()
RUnlock()
}

// noOpMutex fakes a mutex for sequential sync.
type noOpMutex struct{}

func (m *noOpMutex) Lock() {}
func (m *noOpMutex) Unlock() {}
func (m *noOpMutex) RLock() {}
func (m *noOpMutex) RUnlock() {}

type ProgressLog struct {
resourceTypes int
resources map[string]int
entitlementsProgress map[string]int
lastEntitlementLog map[string]time.Time
grantsProgress map[string]int
lastGrantLog map[string]time.Time
mu rwMutex // If noOpMutex, sequential mode is enabled. If sync.RWMutex, parallel mode is enabled.
mu sync.RWMutex
l *zap.Logger
maxLogFrequency time.Duration

Expand Down Expand Up @@ -99,17 +84,6 @@ func WithLogger(l *zap.Logger) Option {
}
}

// WithSequentialMode enables/disables mutex protection for sequential sync.
func WithSequentialMode(sequential bool) Option {
return func(p *ProgressLog) {
if sequential {
p.mu = &noOpMutex{}
} else {
p.mu = &sync.RWMutex{}
}
}
}

func WithLogFrequency(logFrequency time.Duration) Option {
return func(p *ProgressLog) {
p.maxLogFrequency = logFrequency
Expand Down Expand Up @@ -178,7 +152,7 @@ func NewProgressCounts(ctx context.Context, opts ...Option) *ProgressLog {
lastGrantLog: make(map[string]time.Time),
l: ctxzap.Extract(ctx),
maxLogFrequency: defaultMaxLogFrequency,
mu: &noOpMutex{}, // Default to sequential mode for backward compatibility
mu: sync.RWMutex{},
metricsHandler: metrics.NewNoOpHandler(ctx),
}
for _, o := range opts {
Expand Down
31 changes: 10 additions & 21 deletions pkg/sync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ type syncer struct {
syncResourceTypes []string
previousSyncMu native_sync.Mutex
previousSyncIDPtr atomic.Pointer[string]
workerCount int // If 0, sequential sync is used. If > 0, parallel sync is used.
workerCount int // If 1, sync is sequential (default). If > 1, sync operations are done in parallel.
}

var _ Syncer = (*syncer)(nil)
Expand Down Expand Up @@ -452,17 +452,9 @@ func (s *syncer) Sync(ctx context.Context) error {
)
}

var warnings []error
if s.workerCount == 0 {
warnings, err = s.sequentialSync(ctx, runCtx, targetedResources)
if err != nil {
return err
}
} else {
warnings, err = s.parallelSync(ctx, runCtx, targetedResources)
if err != nil {
return err
}
warnings, err := s.parallelSync(ctx, runCtx, targetedResources)
if err != nil {
return err
}

// Force a checkpoint to clear completed actions & entitlement graph in sync_token.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Suggestion: This condition s.workerCount > 0 is now always true since workerCount is guaranteed >= 1 after this PR. Consider changing to s.workerCount > 1 to preserve sequential-mode progress reporting for single-worker syncs, or remove the condition entirely.

Expand Down Expand Up @@ -2813,14 +2805,13 @@ func NormalizeWorkerCount(count int) int {
if count == -1 {
return min(max(runtime.GOMAXPROCS(0), 1), 4)
}
return max(count, 0)
return max(count, 1)
}

// WithWorkerCount sets the number of workers to use.
// If 0, sequential sync is used. If > 0, parallel sync is used.
// If <=1, 1 worker is used (default). If > 1, parallel sync is used.
// If -1, the number of workers is set to the number of CPU cores or 4, whichever is lower.
// If < -1, sequential sync is used.
// Yes, this allows for a "parallel" sync with one worker, effectively making it sequential.
// If < -1, 1 worker is used. (Nothing should do this, but there's no way to return an error in this option.)
func WithWorkerCount(count int) SyncOpt {
return func(s *syncer) {
s.workerCount = NormalizeWorkerCount(count)
Expand All @@ -2830,8 +2821,9 @@ func WithWorkerCount(count int) SyncOpt {
// NewSyncer returns a new syncer object.
func NewSyncer(ctx context.Context, c types.ConnectorClient, opts ...SyncOpt) (Syncer, error) {
s := &syncer{
connector: c,
syncType: connectorstore.SyncTypeFull,
connector: c,
syncType: connectorstore.SyncTypeFull,
workerCount: 1,
}

for _, o := range opts {
Expand All @@ -2843,9 +2835,6 @@ func NewSyncer(ctx context.Context, c types.ConnectorClient, opts ...SyncOpt) (S
}

progressLogOpts := []progresslog.Option{}
if s.workerCount > 0 {
progressLogOpts = append(progressLogOpts, progresslog.WithSequentialMode(false))
}
s.counts = progresslog.NewProgressCounts(ctx, progressLogOpts...)
// Wire the DBSizeProvider now if the store is already set (WithConnectorStore
// case). For WithC1ZPath, the store is populated later inside loadStore,
Expand Down
Loading