diff --git a/pkg/sync/checkpoint_interval_test.go b/pkg/sync/checkpoint_interval_test.go new file mode 100644 index 000000000..b294f1f4b --- /dev/null +++ b/pkg/sync/checkpoint_interval_test.go @@ -0,0 +1,78 @@ +package sync //nolint:revive,nolintlint // existing package name preserved for backwards compatibility + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestWithMinCheckpointInterval(t *testing.T) { + t.Run("sets positive duration", func(t *testing.T) { + s := &syncer{} + WithMinCheckpointInterval(5 * time.Second)(s) + require.Equal(t, 5*time.Second, s.minCheckpointInterval) + }) + + t.Run("zero is rejected (field stays at default-zero so Checkpoint falls back)", func(t *testing.T) { + s := &syncer{minCheckpointInterval: 7 * time.Second} + WithMinCheckpointInterval(0)(s) + require.Equal(t, 7*time.Second, s.minCheckpointInterval, + "zero must not overwrite a previously-set value") + }) + + t.Run("negative is rejected", func(t *testing.T) { + s := &syncer{minCheckpointInterval: 7 * time.Second} + WithMinCheckpointInterval(-1 * time.Second)(s) + require.Equal(t, 7*time.Second, s.minCheckpointInterval, + "negative must not overwrite a previously-set value") + }) + + t.Run("starting from zero stays zero for non-positive input", func(t *testing.T) { + s := &syncer{} + WithMinCheckpointInterval(0)(s) + require.Equal(t, time.Duration(0), s.minCheckpointInterval) + WithMinCheckpointInterval(-1)(s) + require.Equal(t, time.Duration(0), s.minCheckpointInterval) + }) +} + +// TestSyncerCheckpoint_IntervalGate exercises the interval gate on Checkpoint +// without touching state.Marshal or store.CheckpointSync. We force-set +// lastCheckPointTime so the gate is reached, then assert the early-return. +// A configured interval of 1 hour should short-circuit; the default (10s) +// should also short-circuit when lastCheckPointTime is "now". +func TestSyncerCheckpoint_IntervalGate(t *testing.T) { + t.Run("configured interval gates non-forced checkpoint", func(t *testing.T) { + s := &syncer{ + minCheckpointInterval: 1 * time.Hour, + lastCheckPointTime: time.Now(), + } + err := s.Checkpoint(t.Context(), false) + require.NoError(t, err, "interval gate must short-circuit before reaching store") + }) + + t.Run("default interval gates non-forced checkpoint right after a previous one", func(t *testing.T) { + s := &syncer{ + // minCheckpointInterval=0 → Checkpoint falls back to defaultMinCheckpointInterval (10s) + lastCheckPointTime: time.Now(), + } + err := s.Checkpoint(t.Context(), false) + require.NoError(t, err) + }) + + t.Run("zero lastCheckPointTime bypasses the gate", func(t *testing.T) { + // lastCheckPointTime.IsZero() → gate does not apply → Checkpoint proceeds + // to state.Marshal which will panic on a nil state. We use a deferred + // recover to assert we got past the early-return. + s := &syncer{ + minCheckpointInterval: 1 * time.Hour, + } + defer func() { + if r := recover(); r == nil { + t.Fatal("expected Checkpoint to proceed past the gate (and panic on nil state)") + } + }() + _ = s.Checkpoint(t.Context(), false) + }) +} diff --git a/pkg/sync/syncer.go b/pkg/sync/syncer.go index e7124eb56..23ab8cf21 100644 --- a/pkg/sync/syncer.go +++ b/pkg/sync/syncer.go @@ -140,6 +140,7 @@ type syncer struct { injectSyncIDAnnotation bool setSessionStore sessions.SetSessionStore syncResourceTypes []string + minCheckpointInterval time.Duration previousSyncMu native_sync.Mutex previousSyncIDPtr atomic.Pointer[string] workerCount int // If 1, sync is sequential (default). If > 1, sync operations are done in parallel. @@ -169,11 +170,15 @@ func (a expanderStoreAdapter) StoreExpandedGrants(ctx context.Context, grants .. return a.store.Grants().StoreExpandedGrants(ctx, grants...) } -const minCheckpointInterval = 10 * time.Second +const defaultMinCheckpointInterval = 10 * time.Second // Checkpoint marshals the current state and stores it. func (s *syncer) Checkpoint(ctx context.Context, force bool) error { - if !force && !s.lastCheckPointTime.IsZero() && time.Since(s.lastCheckPointTime) < minCheckpointInterval { + interval := s.minCheckpointInterval + if interval == 0 { + interval = defaultMinCheckpointInterval + } + if !force && !s.lastCheckPointTime.IsZero() && time.Since(s.lastCheckPointTime) < interval { return nil } ctx, span := tracer.Start(ctx, "syncer.Checkpoint") @@ -2735,6 +2740,18 @@ func WithRunDuration(d time.Duration) SyncOpt { } } +// WithMinCheckpointInterval sets the minimum time between non-forced +// checkpoints. Checkpoints compress and persist the c1z file; for large +// tenants this dominates CPU. Higher values reduce checkpoint overhead at +// the cost of more lost work on a hard crash. Default 10s. +func WithMinCheckpointInterval(d time.Duration) SyncOpt { + return func(s *syncer) { + if d > 0 { + s.minCheckpointInterval = d + } + } +} + // WithTransitionHandler sets a `transitionHandler` for `NewSyncer` Options. func WithTransitionHandler(f func(s Action)) SyncOpt { return func(s *syncer) {