From 53edf06b0af5c8df3113147dfe576cda60f71855 Mon Sep 17 00:00:00 2001 From: arreyder Date: Sun, 24 May 2026 09:16:16 -0500 Subject: [PATCH 1/2] perf(sync): add WithMinCheckpointInterval option MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Checkpoints compress and persist the entire c1z SQLite file. For large tenants (Eli Lilly: 72 GB c1z), this dominates CPU — profile shows 60% of Lilly's sync time in saveC1z → zstd.Encode → io.Copy on every step. The existing 10s throttle (minCheckpointInterval) is hardcoded. Add WithMinCheckpointInterval(d) as a SyncOpt so callers (c1's sync activity) can tune per-tenant or per-connector-size. Higher values reduce checkpoint I/O at the cost of more lost work on a hard crash (not a clean activity-window timeout, which always checkpoints via force=true). Lilly's GLB connector has had zero hard crashes in 15 days of data — only clean timeouts. Example usage from c1: syncer.New(connector, opts..., sync.WithMinCheckpointInterval(2 * time.Minute), ) Co-Authored-By: Claude Sonnet 4.5 --- pkg/sync/syncer.go | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/pkg/sync/syncer.go b/pkg/sync/syncer.go index e7124eb56..23ab8cf21 100644 --- a/pkg/sync/syncer.go +++ b/pkg/sync/syncer.go @@ -140,6 +140,7 @@ type syncer struct { injectSyncIDAnnotation bool setSessionStore sessions.SetSessionStore syncResourceTypes []string + minCheckpointInterval time.Duration previousSyncMu native_sync.Mutex previousSyncIDPtr atomic.Pointer[string] workerCount int // If 1, sync is sequential (default). If > 1, sync operations are done in parallel. @@ -169,11 +170,15 @@ func (a expanderStoreAdapter) StoreExpandedGrants(ctx context.Context, grants .. return a.store.Grants().StoreExpandedGrants(ctx, grants...) } -const minCheckpointInterval = 10 * time.Second +const defaultMinCheckpointInterval = 10 * time.Second // Checkpoint marshals the current state and stores it. func (s *syncer) Checkpoint(ctx context.Context, force bool) error { - if !force && !s.lastCheckPointTime.IsZero() && time.Since(s.lastCheckPointTime) < minCheckpointInterval { + interval := s.minCheckpointInterval + if interval == 0 { + interval = defaultMinCheckpointInterval + } + if !force && !s.lastCheckPointTime.IsZero() && time.Since(s.lastCheckPointTime) < interval { return nil } ctx, span := tracer.Start(ctx, "syncer.Checkpoint") @@ -2735,6 +2740,18 @@ func WithRunDuration(d time.Duration) SyncOpt { } } +// WithMinCheckpointInterval sets the minimum time between non-forced +// checkpoints. Checkpoints compress and persist the c1z file; for large +// tenants this dominates CPU. Higher values reduce checkpoint overhead at +// the cost of more lost work on a hard crash. Default 10s. +func WithMinCheckpointInterval(d time.Duration) SyncOpt { + return func(s *syncer) { + if d > 0 { + s.minCheckpointInterval = d + } + } +} + // WithTransitionHandler sets a `transitionHandler` for `NewSyncer` Options. func WithTransitionHandler(f func(s Action)) SyncOpt { return func(s *syncer) { From ff558a96bcc9d7e2c182732c1ccddb5f3d212438 Mon Sep 17 00:00:00 2001 From: arreyder Date: Wed, 27 May 2026 18:12:36 -0500 Subject: [PATCH 2/2] test: cover WithMinCheckpointInterval option mechanics MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses bot review on #865 — no tests existed for the new exported option. Covers: - positive duration sets the field - zero/negative leave the field unchanged (gate falls back to default) - Checkpoint's interval gate short-circuits when within the interval Co-Authored-By: Claude Opus 4.7 (1M context) --- pkg/sync/checkpoint_interval_test.go | 78 ++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) create mode 100644 pkg/sync/checkpoint_interval_test.go diff --git a/pkg/sync/checkpoint_interval_test.go b/pkg/sync/checkpoint_interval_test.go new file mode 100644 index 000000000..b294f1f4b --- /dev/null +++ b/pkg/sync/checkpoint_interval_test.go @@ -0,0 +1,78 @@ +package sync //nolint:revive,nolintlint // existing package name preserved for backwards compatibility + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestWithMinCheckpointInterval(t *testing.T) { + t.Run("sets positive duration", func(t *testing.T) { + s := &syncer{} + WithMinCheckpointInterval(5 * time.Second)(s) + require.Equal(t, 5*time.Second, s.minCheckpointInterval) + }) + + t.Run("zero is rejected (field stays at default-zero so Checkpoint falls back)", func(t *testing.T) { + s := &syncer{minCheckpointInterval: 7 * time.Second} + WithMinCheckpointInterval(0)(s) + require.Equal(t, 7*time.Second, s.minCheckpointInterval, + "zero must not overwrite a previously-set value") + }) + + t.Run("negative is rejected", func(t *testing.T) { + s := &syncer{minCheckpointInterval: 7 * time.Second} + WithMinCheckpointInterval(-1 * time.Second)(s) + require.Equal(t, 7*time.Second, s.minCheckpointInterval, + "negative must not overwrite a previously-set value") + }) + + t.Run("starting from zero stays zero for non-positive input", func(t *testing.T) { + s := &syncer{} + WithMinCheckpointInterval(0)(s) + require.Equal(t, time.Duration(0), s.minCheckpointInterval) + WithMinCheckpointInterval(-1)(s) + require.Equal(t, time.Duration(0), s.minCheckpointInterval) + }) +} + +// TestSyncerCheckpoint_IntervalGate exercises the interval gate on Checkpoint +// without touching state.Marshal or store.CheckpointSync. We force-set +// lastCheckPointTime so the gate is reached, then assert the early-return. +// A configured interval of 1 hour should short-circuit; the default (10s) +// should also short-circuit when lastCheckPointTime is "now". +func TestSyncerCheckpoint_IntervalGate(t *testing.T) { + t.Run("configured interval gates non-forced checkpoint", func(t *testing.T) { + s := &syncer{ + minCheckpointInterval: 1 * time.Hour, + lastCheckPointTime: time.Now(), + } + err := s.Checkpoint(t.Context(), false) + require.NoError(t, err, "interval gate must short-circuit before reaching store") + }) + + t.Run("default interval gates non-forced checkpoint right after a previous one", func(t *testing.T) { + s := &syncer{ + // minCheckpointInterval=0 → Checkpoint falls back to defaultMinCheckpointInterval (10s) + lastCheckPointTime: time.Now(), + } + err := s.Checkpoint(t.Context(), false) + require.NoError(t, err) + }) + + t.Run("zero lastCheckPointTime bypasses the gate", func(t *testing.T) { + // lastCheckPointTime.IsZero() → gate does not apply → Checkpoint proceeds + // to state.Marshal which will panic on a nil state. We use a deferred + // recover to assert we got past the early-return. + s := &syncer{ + minCheckpointInterval: 1 * time.Hour, + } + defer func() { + if r := recover(); r == nil { + t.Fatal("expected Checkpoint to proceed past the gate (and panic on nil state)") + } + }() + _ = s.Checkpoint(t.Context(), false) + }) +}