Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/sync/progresslog/progresslog.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func WithDBSizeProvider(p connectorstore.DBSizeProvider) Option {

// WithMetricsHandler attaches an optional metrics.Handler. When set,
// LogExpandProgress emits gauges/counters mirroring the structured log fields
// (actions_remaining, actions_burned, decompressed_bytes, decompressed_bytes_growth)
// (actions_remaining, actions_burned, decompressed_bytes, decompressed_bytes_delta)
// so operators can build dashboards instead of scraping log timestamps.
//
// The handler should be pre-tagged by the caller (e.g. via Handler.WithTags
Expand Down
23 changes: 23 additions & 0 deletions pkg/sync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/conductorone/baton-sdk/pkg/annotations"
"github.com/conductorone/baton-sdk/pkg/connectorstore"
"github.com/conductorone/baton-sdk/pkg/dotc1z/manager"
"github.com/conductorone/baton-sdk/pkg/metrics"
"github.com/conductorone/baton-sdk/pkg/sync/progresslog"
"github.com/conductorone/baton-sdk/pkg/types"
)
Expand Down Expand Up @@ -143,6 +144,7 @@ type syncer struct {
previousSyncMu native_sync.Mutex
previousSyncIDPtr atomic.Pointer[string]
workerCount int // If 0, sequential sync is used. If > 0, parallel sync is used.
metricsHandler metrics.Handler
}

var _ Syncer = (*syncer)(nil)
Expand Down Expand Up @@ -2816,6 +2818,24 @@ func NormalizeWorkerCount(count int) int {
return max(count, 0)
}

// WithMetricsHandler attaches a metrics.Handler that the syncer forwards to
// progresslog.NewProgressCounts so the grant-expansion OTel instruments
// (baton.sync.expand.actions_remaining / actions_burned /
// decompressed_bytes / decompressed_bytes_delta) actually reach the
// configured exporter instead of the default no-op handler.
//
// Callers should pre-tag the handler with the dimensions they want to slice
// by (e.g. tenant_id, connector_id) via Handler.WithTags before passing it
// in — baton-sdk has no view of those identifiers.
func WithMetricsHandler(h metrics.Handler) SyncOpt {
return func(s *syncer) {
if h == nil {
return
}
s.metricsHandler = h
}
}

// WithWorkerCount sets the number of workers to use.
// If 0, sequential sync is used. If > 0, parallel sync is used.
// If -1, the number of workers is set to the number of CPU cores or 4, whichever is lower.
Expand Down Expand Up @@ -2846,6 +2866,9 @@ func NewSyncer(ctx context.Context, c types.ConnectorClient, opts ...SyncOpt) (S
if s.workerCount > 0 {
progressLogOpts = append(progressLogOpts, progresslog.WithSequentialMode(false))
}
if s.metricsHandler != nil {
progressLogOpts = append(progressLogOpts, progresslog.WithMetricsHandler(s.metricsHandler))
}
s.counts = progresslog.NewProgressCounts(ctx, progressLogOpts...)
// Wire the DBSizeProvider now if the store is already set (WithConnectorStore
// case). For WithC1ZPath, the store is populated later inside loadStore,
Expand Down
120 changes: 120 additions & 0 deletions pkg/sync/syncer_metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package sync //nolint:revive,nolintlint // we can't change the package name for backwards compatibility

import (
"context"
"path/filepath"
native_sync "sync"
"testing"

"github.com/conductorone/baton-sdk/pkg/metrics"
"github.com/conductorone/baton-sdk/pkg/sync/expand"
"github.com/stretchr/testify/require"
)

// fakeMetricsHandler records gauge observations so the test can prove the
// caller's handler reaches the ProgressLog the syncer constructs.
type fakeMetricsHandler struct {
mu native_sync.Mutex
gauges map[string][]int64
}

func newFakeMetricsHandler() *fakeMetricsHandler {
return &fakeMetricsHandler{gauges: make(map[string][]int64)}
}

func (h *fakeMetricsHandler) Int64Counter(_, _ string, _ metrics.Unit) metrics.Int64Counter {
return noopFakeCounter{}
}

func (h *fakeMetricsHandler) Int64Gauge(name, _ string, _ metrics.Unit) metrics.Int64Gauge {
return &fakeMetricsGauge{handler: h, name: name}
}

func (h *fakeMetricsHandler) Int64Histogram(_, _ string, _ metrics.Unit) metrics.Int64Histogram {
return noopFakeHistogram{}
}

func (h *fakeMetricsHandler) WithTags(_ map[string]string) metrics.Handler { return h }

func (h *fakeMetricsHandler) observations(name string) []int64 {
h.mu.Lock()
defer h.mu.Unlock()
out := make([]int64, len(h.gauges[name]))
copy(out, h.gauges[name])
return out
}

type fakeMetricsGauge struct {
handler *fakeMetricsHandler
name string
}

func (g *fakeMetricsGauge) Observe(_ context.Context, value int64, _ map[string]string) {
g.handler.mu.Lock()
defer g.handler.mu.Unlock()
g.handler.gauges[g.name] = append(g.handler.gauges[g.name], value)
}

type noopFakeCounter struct{}

func (noopFakeCounter) Add(_ context.Context, _ int64, _ map[string]string) {}

type noopFakeHistogram struct{}

func (noopFakeHistogram) Record(_ context.Context, _ int64, _ map[string]string) {}

// TestNewSyncer_WithMetricsHandler_WiresThroughToProgressLog pins the wiring
// added to forward a caller's metrics.Handler into progresslog.NewProgressCounts.
// Without this, a refactor that drops the conditional append inside NewSyncer
// would silently disable every consumer's baton.sync.expand.* instruments —
// the structured logs would still fire, so existing tests would stay green.
func TestNewSyncer_WithMetricsHandler_WiresThroughToProgressLog(t *testing.T) {
runWithSyncModes(t, func(t *testing.T, extraOpts []SyncOpt) {
ctx := t.Context()

fake := newFakeMetricsHandler()

tempDir := t.TempDir()
c1zpath := filepath.Join(tempDir, "wiring.c1z")
opts := append([]SyncOpt{
WithC1ZPath(c1zpath),
WithTmpDir(tempDir),
WithMetricsHandler(fake),
}, extraOpts...)

syncerIface, err := NewSyncer(ctx, nil, opts...)
require.NoError(t, err)

s, ok := syncerIface.(*syncer)
require.True(t, ok, "NewSyncer returns *syncer")
require.NotNil(t, s.counts, "ProgressLog should be constructed")

actions := make([]*expand.EntitlementGraphAction, 7)
s.counts.LogExpandProgress(ctx, actions)

require.Equal(t, []int64{7}, fake.observations("baton.sync.expand.actions_remaining"),
"caller's metrics.Handler did not receive the actions_remaining gauge observation — "+
"WithMetricsHandler wiring in NewSyncer is broken")
})
}

// TestNewSyncer_NoMetricsHandler_DoesNotPanic pins the default path: callers
// that omit WithMetricsHandler should get a working ProgressLog backed by the
// no-op handler. Mirrors progresslog's own NoMetricsHandlerIsSafe coverage
// at the syncer wiring layer.
func TestNewSyncer_NoMetricsHandler_DoesNotPanic(t *testing.T) {
ctx := t.Context()

tempDir := t.TempDir()
c1zpath := filepath.Join(tempDir, "wiring.c1z")
syncerIface, err := NewSyncer(ctx, nil, WithC1ZPath(c1zpath), WithTmpDir(tempDir))
require.NoError(t, err)

s, ok := syncerIface.(*syncer)
require.True(t, ok)
require.NotNil(t, s.counts)

require.NotPanics(t, func() {
s.counts.LogExpandProgress(ctx, make([]*expand.EntitlementGraphAction, 3))
})
}
Loading