Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions pkg/sync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ type syncer struct {
injectSyncIDAnnotation bool
setSessionStore sessions.SetSessionStore
syncResourceTypes []string
minCheckpointInterval time.Duration
previousSyncMu native_sync.Mutex
previousSyncIDPtr atomic.Pointer[string]
workerCount int // If 0, sequential sync is used. If > 0, parallel sync is used.
Expand Down Expand Up @@ -169,11 +170,15 @@ func (a expanderStoreAdapter) StoreExpandedGrants(ctx context.Context, grants ..
return a.store.Grants().StoreExpandedGrants(ctx, grants...)
}

const minCheckpointInterval = 10 * time.Second
const defaultMinCheckpointInterval = 10 * time.Second

// Checkpoint marshals the current state and stores it.
func (s *syncer) Checkpoint(ctx context.Context, force bool) error {
if !force && !s.lastCheckPointTime.IsZero() && time.Since(s.lastCheckPointTime) < minCheckpointInterval {
interval := s.minCheckpointInterval
if interval == 0 {
interval = defaultMinCheckpointInterval
}
if !force && !s.lastCheckPointTime.IsZero() && time.Since(s.lastCheckPointTime) < interval {
return nil
}
ctx, span := tracer.Start(ctx, "syncer.Checkpoint")
Expand Down Expand Up @@ -2674,6 +2679,18 @@ func WithRunDuration(d time.Duration) SyncOpt {
}
}

// WithMinCheckpointInterval sets the minimum time between non-forced
// checkpoints. Checkpoints compress and persist the c1z file; for large
// tenants this dominates CPU. Higher values reduce checkpoint overhead at
// the cost of more lost work on a hard crash. Default 10s.
func WithMinCheckpointInterval(d time.Duration) SyncOpt {
return func(s *syncer) {
if d > 0 {
s.minCheckpointInterval = d
}
}
}

Comment on lines +2682 to +2693
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Suggestion: No tests were added for the new WithMinCheckpointInterval option. A small unit test verifying the option sets the field, rejects zero/negative durations, and that Checkpoint respects the configured interval would guard against regressions — especially since this is an exported SDK API that downstream callers will depend on.

// WithTransitionHandler sets a `transitionHandler` for `NewSyncer` Options.
func WithTransitionHandler(f func(s Action)) SyncOpt {
return func(s *syncer) {
Expand Down
Loading