From 093074b9f785fef268830bc5ff17be72a34c44d9 Mon Sep 17 00:00:00 2001 From: Geoff Greer Date: Tue, 26 May 2026 15:21:01 -0700 Subject: [PATCH] Simplify syncer code. Use the parallel syncer for all syncs. Default to 1 worker so behavior is the same. --- pkg/sync/normalize_worker_count_test.go | 4 +- pkg/sync/parallel_syncer.go | 222 ------------------------ pkg/sync/progresslog/progresslog.go | 30 +--- pkg/sync/syncer.go | 31 ++-- 4 files changed, 14 insertions(+), 273 deletions(-) diff --git a/pkg/sync/normalize_worker_count_test.go b/pkg/sync/normalize_worker_count_test.go index 620743631..32c015b6a 100644 --- a/pkg/sync/normalize_worker_count_test.go +++ b/pkg/sync/normalize_worker_count_test.go @@ -18,9 +18,9 @@ func TestNormalizeWorkerCount(t *testing.T) { want int }{ {"minus_one_auto", -1, auto}, - {"zero_sequential", 0, 0}, + {"zero_sequential", 0, 1}, {"positive", 7, 7}, - {"below_minus_one_sequential", -99, 0}, + {"below_minus_one_sequential", -99, 1}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/pkg/sync/parallel_syncer.go b/pkg/sync/parallel_syncer.go index da26db45e..9cf4ab5c1 100644 --- a/pkg/sync/parallel_syncer.go +++ b/pkg/sync/parallel_syncer.go @@ -13,228 +13,6 @@ import ( "go.uber.org/zap" ) -func (s *syncer) sequentialSync( - ctx context.Context, - runCtx context.Context, - targetedResources []*v2.Resource, -) ([]error, error) { - l := ctxzap.Extract(ctx) - - retryer := retry.NewRetryer(ctx, retry.RetryConfig{ - MaxAttempts: 0, - InitialDelay: 1 * time.Second, - MaxDelay: 0, - }) - - var warnings []error - for s.state.Current() != nil { - err := s.Checkpoint(ctx, false) - if err != nil { - return warnings, err - } - - // If we have more than 10 warnings and more than 10% of actions ended in a warning, exit the sync. - if len(warnings) > 10 { - completedActionsCount := s.state.GetCompletedActionsCount() - if completedActionsCount > 0 && float64(len(warnings))/float64(completedActionsCount) > 0.1 { - return warnings, fmt.Errorf("%w: warnings: %v completed actions: %d", ErrTooManyWarnings, warnings, completedActionsCount) - } - } - select { - case <-runCtx.Done(): - err = context.Cause(runCtx) - switch { - case errors.Is(err, context.DeadlineExceeded): - l.Info("sync run duration has expired, exiting sync early", zap.String("sync_id", s.syncID)) - // It would be nice to remove this once we're more confident in the checkpointing logic. - checkpointErr := s.Checkpoint(ctx, true) - if checkpointErr != nil { - l.Error("error checkpointing before exiting sync", zap.Error(checkpointErr)) - } - return warnings, errors.Join(checkpointErr, ErrSyncNotComplete) - default: - l.Error("sync context cancelled", zap.String("sync_id", s.syncID), zap.Error(err)) - return warnings, err - } - default: - } - - stateAction := s.state.Current() - - switch stateAction.Op { - case InitOp: - s.state.FinishAction(ctx, stateAction) - - if s.skipEntitlementsAndGrants { - s.state.SetShouldSkipEntitlementsAndGrants() - } - if s.skipGrants { - s.state.SetShouldSkipGrants() - } - if len(targetedResources) > 0 { - for _, r := range targetedResources { - s.state.PushAction(ctx, Action{ - Op: SyncTargetedResourceOp, - ResourceID: r.GetId().GetResource(), - ResourceTypeID: r.GetId().GetResourceType(), - ParentResourceID: r.GetParentResourceId().GetResource(), - ParentResourceTypeID: r.GetParentResourceId().GetResourceType(), - }) - } - s.state.SetShouldFetchRelatedResources() - s.state.PushAction(ctx, Action{Op: SyncResourceTypesOp}) - err = s.Checkpoint(ctx, true) - if err != nil { - return warnings, err - } - // Don't do grant expansion or external resources in partial syncs, as we likely lack related resources/entitlements/grants - continue - } - - // FIXME(jirwin): Disabling syncing assets for now - // s.state.PushAction(ctx, Action{Op: SyncAssetsOp}) - if !s.state.ShouldSkipEntitlementsAndGrants() { - s.state.PushAction(ctx, Action{Op: SyncGrantExpansionOp}) - } - if s.externalResourceReader != nil { - s.state.PushAction(ctx, Action{Op: SyncExternalResourcesOp}) - } - if s.onlyExpandGrants { - s.state.SetNeedsExpansion() - err = s.Checkpoint(ctx, true) - if err != nil { - return warnings, err - } - continue - } - if !s.state.ShouldSkipEntitlementsAndGrants() { - if !s.state.ShouldSkipGrants() { - s.state.PushAction(ctx, Action{Op: SyncGrantsOp}) - } - - s.state.PushAction(ctx, Action{Op: SyncEntitlementsOp}) - - s.state.PushAction(ctx, Action{Op: SyncStaticEntitlementsOp}) - } - s.state.PushAction(ctx, Action{Op: SyncResourcesOp}) - s.state.PushAction(ctx, Action{Op: SyncResourceTypesOp}) - - err = s.Checkpoint(ctx, true) - if err != nil { - return warnings, err - } - continue - - case SyncResourceTypesOp: - err = s.SyncResourceTypes(ctx, stateAction) - if !retryer.ShouldWaitAndRetry(ctx, err) { - return warnings, err - } - continue - - case SyncResourcesOp: - err = s.SyncResources(ctx, stateAction) - if !retryer.ShouldWaitAndRetry(ctx, err) { - return warnings, err - } - continue - - case SyncTargetedResourceOp: - err = s.SyncTargetedResource(ctx, stateAction) - if isWarning(ctx, err) { - l.Warn("skipping sync targeted resource action", zap.Any("stateAction", stateAction), zap.Error(err)) - warnings = append(warnings, err) - s.state.FinishAction(ctx, stateAction) - continue - } - if !retryer.ShouldWaitAndRetry(ctx, err) { - return warnings, err - } - continue - - case SyncStaticEntitlementsOp: - err = s.SyncStaticEntitlements(ctx, stateAction) - if isWarning(ctx, err) { - l.Warn("skipping sync static entitlements action", zap.Any("stateAction", stateAction), zap.Error(err)) - warnings = append(warnings, err) - s.state.FinishAction(ctx, stateAction) - continue - } - if !retryer.ShouldWaitAndRetry(ctx, err) { - return warnings, err - } - continue - case SyncEntitlementsOp: - err = s.SyncEntitlements(ctx, stateAction) - if isWarning(ctx, err) { - l.Warn("skipping sync entitlement action", zap.Any("stateAction", stateAction), zap.Error(err)) - warnings = append(warnings, err) - s.state.FinishAction(ctx, stateAction) - continue - } - if !retryer.ShouldWaitAndRetry(ctx, err) { - return warnings, err - } - continue - - case SyncGrantsOp: - err = s.SyncGrants(ctx, stateAction) - if isWarning(ctx, err) { - l.Warn("skipping sync grant action", zap.Any("stateAction", stateAction), zap.Error(err)) - warnings = append(warnings, err) - s.state.FinishAction(ctx, stateAction) - continue - } - if !retryer.ShouldWaitAndRetry(ctx, err) { - return warnings, err - } - continue - - case SyncExternalResourcesOp: - err = s.SyncExternalResources(ctx, stateAction) - if !retryer.ShouldWaitAndRetry(ctx, err) { - return warnings, err - } - continue - case SyncAssetsOp: - err = s.SyncAssets(ctx, stateAction) - if !retryer.ShouldWaitAndRetry(ctx, err) { - return warnings, err - } - continue - - case SyncGrantExpansionOp: - // Mark the sync as supporting diff, but only if we're starting fresh. - // If we're resuming (graph has edges or a page token), we may be continuing - // from old code that didn't have this marker, so we must not set it. - entitlementGraph := s.state.EntitlementGraph(ctx) - isResumingExpansion := entitlementGraph.Loaded || len(entitlementGraph.Edges) > 0 || stateAction.PageToken != "" - if !isResumingExpansion { - if err := s.store.SyncMeta().MarkSyncSupportsDiff(ctx, s.syncID); err != nil { - l.Error("failed to set supports_diff marker", zap.Error(err)) - return warnings, err - } - } - - if s.dontExpandGrants || !s.state.NeedsExpansion() { - l.Debug("skipping grant expansion, no grants to expand") - s.state.FinishAction(ctx, stateAction) - continue - } - - err = s.SyncGrantExpansion(ctx, stateAction) - if !retryer.ShouldWaitAndRetry(ctx, err) { - return warnings, err - } - continue - default: - return warnings, fmt.Errorf("unexpected sync step") - } - } - return warnings, nil -} - -// TODO: Merge parallelSync and sequentialSync once we're confident in parallelSync's behavior. func (s *syncer) parallelSync( ctx context.Context, runCtx context.Context, diff --git a/pkg/sync/progresslog/progresslog.go b/pkg/sync/progresslog/progresslog.go index cacda05bc..befe812ba 100644 --- a/pkg/sync/progresslog/progresslog.go +++ b/pkg/sync/progresslog/progresslog.go @@ -31,21 +31,6 @@ const ( metricDecompressedBytesDelta = "baton.sync.expand.decompressed_bytes_delta" ) -type rwMutex interface { - Lock() - Unlock() - RLock() - RUnlock() -} - -// noOpMutex fakes a mutex for sequential sync. -type noOpMutex struct{} - -func (m *noOpMutex) Lock() {} -func (m *noOpMutex) Unlock() {} -func (m *noOpMutex) RLock() {} -func (m *noOpMutex) RUnlock() {} - type ProgressLog struct { resourceTypes int resources map[string]int @@ -53,7 +38,7 @@ type ProgressLog struct { lastEntitlementLog map[string]time.Time grantsProgress map[string]int lastGrantLog map[string]time.Time - mu rwMutex // If noOpMutex, sequential mode is enabled. If sync.RWMutex, parallel mode is enabled. + mu sync.RWMutex l *zap.Logger maxLogFrequency time.Duration @@ -99,17 +84,6 @@ func WithLogger(l *zap.Logger) Option { } } -// WithSequentialMode enables/disables mutex protection for sequential sync. -func WithSequentialMode(sequential bool) Option { - return func(p *ProgressLog) { - if sequential { - p.mu = &noOpMutex{} - } else { - p.mu = &sync.RWMutex{} - } - } -} - func WithLogFrequency(logFrequency time.Duration) Option { return func(p *ProgressLog) { p.maxLogFrequency = logFrequency @@ -178,7 +152,7 @@ func NewProgressCounts(ctx context.Context, opts ...Option) *ProgressLog { lastGrantLog: make(map[string]time.Time), l: ctxzap.Extract(ctx), maxLogFrequency: defaultMaxLogFrequency, - mu: &noOpMutex{}, // Default to sequential mode for backward compatibility + mu: sync.RWMutex{}, metricsHandler: metrics.NewNoOpHandler(ctx), } for _, o := range opts { diff --git a/pkg/sync/syncer.go b/pkg/sync/syncer.go index 47de7dc6e..f11f0a277 100644 --- a/pkg/sync/syncer.go +++ b/pkg/sync/syncer.go @@ -142,7 +142,7 @@ type syncer struct { syncResourceTypes []string previousSyncMu native_sync.Mutex previousSyncIDPtr atomic.Pointer[string] - workerCount int // If 0, sequential sync is used. If > 0, parallel sync is used. + workerCount int // If 1, sync is sequential (default). If > 1, sync operations are done in parallel. } var _ Syncer = (*syncer)(nil) @@ -452,17 +452,9 @@ func (s *syncer) Sync(ctx context.Context) error { ) } - var warnings []error - if s.workerCount == 0 { - warnings, err = s.sequentialSync(ctx, runCtx, targetedResources) - if err != nil { - return err - } - } else { - warnings, err = s.parallelSync(ctx, runCtx, targetedResources) - if err != nil { - return err - } + warnings, err := s.parallelSync(ctx, runCtx, targetedResources) + if err != nil { + return err } // Force a checkpoint to clear completed actions & entitlement graph in sync_token. @@ -2813,14 +2805,13 @@ func NormalizeWorkerCount(count int) int { if count == -1 { return min(max(runtime.GOMAXPROCS(0), 1), 4) } - return max(count, 0) + return max(count, 1) } // WithWorkerCount sets the number of workers to use. -// If 0, sequential sync is used. If > 0, parallel sync is used. +// If <=1, 1 worker is used (default). If > 1, parallel sync is used. // If -1, the number of workers is set to the number of CPU cores or 4, whichever is lower. -// If < -1, sequential sync is used. -// Yes, this allows for a "parallel" sync with one worker, effectively making it sequential. +// If < -1, 1 worker is used. (Nothing should do this, but there's no way to return an error in this option.) func WithWorkerCount(count int) SyncOpt { return func(s *syncer) { s.workerCount = NormalizeWorkerCount(count) @@ -2830,8 +2821,9 @@ func WithWorkerCount(count int) SyncOpt { // NewSyncer returns a new syncer object. func NewSyncer(ctx context.Context, c types.ConnectorClient, opts ...SyncOpt) (Syncer, error) { s := &syncer{ - connector: c, - syncType: connectorstore.SyncTypeFull, + connector: c, + syncType: connectorstore.SyncTypeFull, + workerCount: 1, } for _, o := range opts { @@ -2843,9 +2835,6 @@ func NewSyncer(ctx context.Context, c types.ConnectorClient, opts ...SyncOpt) (S } progressLogOpts := []progresslog.Option{} - if s.workerCount > 0 { - progressLogOpts = append(progressLogOpts, progresslog.WithSequentialMode(false)) - } s.counts = progresslog.NewProgressCounts(ctx, progressLogOpts...) // Wire the DBSizeProvider now if the store is already set (WithConnectorStore // case). For WithC1ZPath, the store is populated later inside loadStore,