diff --git a/pkg/sync/progresslog/progresslog.go b/pkg/sync/progresslog/progresslog.go index cacda05bc..d0e310ddf 100644 --- a/pkg/sync/progresslog/progresslog.go +++ b/pkg/sync/progresslog/progresslog.go @@ -143,7 +143,7 @@ func WithDBSizeProvider(p connectorstore.DBSizeProvider) Option { // WithMetricsHandler attaches an optional metrics.Handler. When set, // LogExpandProgress emits gauges/counters mirroring the structured log fields -// (actions_remaining, actions_burned, decompressed_bytes, decompressed_bytes_growth) +// (actions_remaining, actions_burned, decompressed_bytes, decompressed_bytes_delta) // so operators can build dashboards instead of scraping log timestamps. // // The handler should be pre-tagged by the caller (e.g. via Handler.WithTags diff --git a/pkg/sync/syncer.go b/pkg/sync/syncer.go index 47de7dc6e..05d213e63 100644 --- a/pkg/sync/syncer.go +++ b/pkg/sync/syncer.go @@ -43,6 +43,7 @@ import ( "github.com/conductorone/baton-sdk/pkg/annotations" "github.com/conductorone/baton-sdk/pkg/connectorstore" "github.com/conductorone/baton-sdk/pkg/dotc1z/manager" + "github.com/conductorone/baton-sdk/pkg/metrics" "github.com/conductorone/baton-sdk/pkg/sync/progresslog" "github.com/conductorone/baton-sdk/pkg/types" ) @@ -143,6 +144,7 @@ type syncer struct { previousSyncMu native_sync.Mutex previousSyncIDPtr atomic.Pointer[string] workerCount int // If 0, sequential sync is used. If > 0, parallel sync is used. + metricsHandler metrics.Handler } var _ Syncer = (*syncer)(nil) @@ -2816,6 +2818,24 @@ func NormalizeWorkerCount(count int) int { return max(count, 0) } +// WithMetricsHandler attaches a metrics.Handler that the syncer forwards to +// progresslog.NewProgressCounts so the grant-expansion OTel instruments +// (baton.sync.expand.actions_remaining / actions_burned / +// decompressed_bytes / decompressed_bytes_delta) actually reach the +// configured exporter instead of the default no-op handler. +// +// Callers should pre-tag the handler with the dimensions they want to slice +// by (e.g. tenant_id, connector_id) via Handler.WithTags before passing it +// in — baton-sdk has no view of those identifiers. +func WithMetricsHandler(h metrics.Handler) SyncOpt { + return func(s *syncer) { + if h == nil { + return + } + s.metricsHandler = h + } +} + // WithWorkerCount sets the number of workers to use. // If 0, sequential sync is used. If > 0, parallel sync is used. // If -1, the number of workers is set to the number of CPU cores or 4, whichever is lower. @@ -2846,6 +2866,9 @@ func NewSyncer(ctx context.Context, c types.ConnectorClient, opts ...SyncOpt) (S if s.workerCount > 0 { progressLogOpts = append(progressLogOpts, progresslog.WithSequentialMode(false)) } + if s.metricsHandler != nil { + progressLogOpts = append(progressLogOpts, progresslog.WithMetricsHandler(s.metricsHandler)) + } s.counts = progresslog.NewProgressCounts(ctx, progressLogOpts...) // Wire the DBSizeProvider now if the store is already set (WithConnectorStore // case). For WithC1ZPath, the store is populated later inside loadStore, diff --git a/pkg/sync/syncer_metrics_test.go b/pkg/sync/syncer_metrics_test.go new file mode 100644 index 000000000..45f56046a --- /dev/null +++ b/pkg/sync/syncer_metrics_test.go @@ -0,0 +1,120 @@ +package sync //nolint:revive,nolintlint // we can't change the package name for backwards compatibility + +import ( + "context" + "path/filepath" + native_sync "sync" + "testing" + + "github.com/conductorone/baton-sdk/pkg/metrics" + "github.com/conductorone/baton-sdk/pkg/sync/expand" + "github.com/stretchr/testify/require" +) + +// fakeMetricsHandler records gauge observations so the test can prove the +// caller's handler reaches the ProgressLog the syncer constructs. +type fakeMetricsHandler struct { + mu native_sync.Mutex + gauges map[string][]int64 +} + +func newFakeMetricsHandler() *fakeMetricsHandler { + return &fakeMetricsHandler{gauges: make(map[string][]int64)} +} + +func (h *fakeMetricsHandler) Int64Counter(_, _ string, _ metrics.Unit) metrics.Int64Counter { + return noopFakeCounter{} +} + +func (h *fakeMetricsHandler) Int64Gauge(name, _ string, _ metrics.Unit) metrics.Int64Gauge { + return &fakeMetricsGauge{handler: h, name: name} +} + +func (h *fakeMetricsHandler) Int64Histogram(_, _ string, _ metrics.Unit) metrics.Int64Histogram { + return noopFakeHistogram{} +} + +func (h *fakeMetricsHandler) WithTags(_ map[string]string) metrics.Handler { return h } + +func (h *fakeMetricsHandler) observations(name string) []int64 { + h.mu.Lock() + defer h.mu.Unlock() + out := make([]int64, len(h.gauges[name])) + copy(out, h.gauges[name]) + return out +} + +type fakeMetricsGauge struct { + handler *fakeMetricsHandler + name string +} + +func (g *fakeMetricsGauge) Observe(_ context.Context, value int64, _ map[string]string) { + g.handler.mu.Lock() + defer g.handler.mu.Unlock() + g.handler.gauges[g.name] = append(g.handler.gauges[g.name], value) +} + +type noopFakeCounter struct{} + +func (noopFakeCounter) Add(_ context.Context, _ int64, _ map[string]string) {} + +type noopFakeHistogram struct{} + +func (noopFakeHistogram) Record(_ context.Context, _ int64, _ map[string]string) {} + +// TestNewSyncer_WithMetricsHandler_WiresThroughToProgressLog pins the wiring +// added to forward a caller's metrics.Handler into progresslog.NewProgressCounts. +// Without this, a refactor that drops the conditional append inside NewSyncer +// would silently disable every consumer's baton.sync.expand.* instruments — +// the structured logs would still fire, so existing tests would stay green. +func TestNewSyncer_WithMetricsHandler_WiresThroughToProgressLog(t *testing.T) { + runWithSyncModes(t, func(t *testing.T, extraOpts []SyncOpt) { + ctx := t.Context() + + fake := newFakeMetricsHandler() + + tempDir := t.TempDir() + c1zpath := filepath.Join(tempDir, "wiring.c1z") + opts := append([]SyncOpt{ + WithC1ZPath(c1zpath), + WithTmpDir(tempDir), + WithMetricsHandler(fake), + }, extraOpts...) + + syncerIface, err := NewSyncer(ctx, nil, opts...) + require.NoError(t, err) + + s, ok := syncerIface.(*syncer) + require.True(t, ok, "NewSyncer returns *syncer") + require.NotNil(t, s.counts, "ProgressLog should be constructed") + + actions := make([]*expand.EntitlementGraphAction, 7) + s.counts.LogExpandProgress(ctx, actions) + + require.Equal(t, []int64{7}, fake.observations("baton.sync.expand.actions_remaining"), + "caller's metrics.Handler did not receive the actions_remaining gauge observation — "+ + "WithMetricsHandler wiring in NewSyncer is broken") + }) +} + +// TestNewSyncer_NoMetricsHandler_DoesNotPanic pins the default path: callers +// that omit WithMetricsHandler should get a working ProgressLog backed by the +// no-op handler. Mirrors progresslog's own NoMetricsHandlerIsSafe coverage +// at the syncer wiring layer. +func TestNewSyncer_NoMetricsHandler_DoesNotPanic(t *testing.T) { + ctx := t.Context() + + tempDir := t.TempDir() + c1zpath := filepath.Join(tempDir, "wiring.c1z") + syncerIface, err := NewSyncer(ctx, nil, WithC1ZPath(c1zpath), WithTmpDir(tempDir)) + require.NoError(t, err) + + s, ok := syncerIface.(*syncer) + require.True(t, ok) + require.NotNil(t, s.counts) + + require.NotPanics(t, func() { + s.counts.LogExpandProgress(ctx, make([]*expand.EntitlementGraphAction, 3)) + }) +}