From ef0ecb364fdc4b94ccdd77f1a7eb1e2255e8f990 Mon Sep 17 00:00:00 2001 From: arreyder Date: Sat, 23 May 2026 21:54:19 -0500 Subject: [PATCH 1/5] perf(expand): prefetch descendant grants to eliminate per-principal SQL queries MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The grant expansion inner loop previously issued one ListGrantsForEntitlement SQL query per source grant to check whether the principal already holds a grant on the descendant entitlement. For large tenants (Eli Lilly: 180K users, 139K groups), a single expansion action issued 180K individual SQLite queries — each cheap in isolation but taking 38 minutes total due to pure-Go SQLite per-query overhead (modernc.org/sqlite). Replace the per-principal inner loop with a single bulk pre-fetch of all grants on the descendant entitlement into an in-memory map keyed by (principal_resource_type_id, principal_resource_id). The source grant loop then does O(1) map lookups instead of O(N) SQL queries. Expected impact for Lilly's GLB Entra connector: Before: 180K SQL queries per action → ~38 minutes After: 1 full scan of descendant (paginated) + map lookups → seconds The pre-fetch is per-runAction call (not cached across actions) to ensure correctness in diamond graphs where action A→C creates grants that action B→C must see as pre-existing. Co-Authored-By: Claude Sonnet 4.5 --- pkg/sync/expand/expander.go | 110 ++++++++++++++++++------------------ 1 file changed, 55 insertions(+), 55 deletions(-) diff --git a/pkg/sync/expand/expander.go b/pkg/sync/expand/expander.go index 2018a3f28..ab1905ad4 100644 --- a/pkg/sync/expand/expander.go +++ b/pkg/sync/expand/expander.go @@ -148,6 +148,39 @@ func (e *Expander) IsDone(ctx context.Context) bool { return e.graph.IsExpanded() } +func descendantGrantKey(resourceTypeID, resourceID string) string { + return resourceTypeID + "\x00" + resourceID +} + +func prefetchDescendantGrants( + ctx context.Context, + store ExpanderStore, + entitlement *v2.Entitlement, +) (map[string][]*v2.Grant, error) { + result := make(map[string][]*v2.Grant) + pageToken := "" + for { + resp, err := store.ListGrantsForEntitlement(ctx, + reader_v2.GrantsReaderServiceListGrantsForEntitlementRequest_builder{ + Entitlement: entitlement, + PageToken: pageToken, + }.Build()) + if err != nil { + return nil, fmt.Errorf("prefetchDescendantGrants: %w", err) + } + for _, g := range resp.GetList() { + principal := g.GetPrincipal().GetId() + key := descendantGrantKey(principal.GetResourceType(), principal.GetResource()) + result[key] = append(result[key], g) + } + pageToken = resp.GetNextPageToken() + if pageToken == "" { + break + } + } + return result, nil +} + // runAction processes a single action and returns the next page token. // If the returned page token is empty, the action is complete. // @@ -208,65 +241,41 @@ func (e *Expander) runAction(ctx context.Context, action *EntitlementGraphAction return "", fmt.Errorf("runAction: error fetching source grants: %w", err) } + descendantByPrincipal, err := prefetchDescendantGrants(ctx, e.store, descendantEntitlement.GetEntitlement()) + if err != nil { + l.Error("runAction: error prefetching descendant grants", zap.Error(err)) + return "", fmt.Errorf("runAction: error prefetching descendant grants: %w", err) + } + span.SetAttributes(attribute.Int("descendant_prefetch_count", len(descendantByPrincipal))) + var newGrants = make([]*v2.Grant, 0) for _, sourceGrant := range sourceGrants.GetList() { - // If this is a shallow action, then we only want to expand grants that have no sources - // which indicates that it was directly assigned. if action.Shallow { sourcesMap := sourceGrant.GetSources().GetSources() - // If we have no sources, this is a direct grant foundDirectGrant := len(sourcesMap) == 0 - // If the source grant has sources, then we need to see if any of them are the source entitlement itself if sourcesMap[action.SourceEntitlementID] != nil { foundDirectGrant = true } - - // This is not a direct grant, so skip it since we are a shallow action if !foundDirectGrant { continue } } - // Determine if the source grant is direct: either it has no sources (never expanded), - // or it has a self-reference (direct grant that was also expanded). sgSources := sourceGrant.GetSources().GetSources() isSourceDirect := len(sgSources) == 0 || sgSources[action.SourceEntitlementID] != nil - // Unroll all grants for the principal on the descendant entitlement. - pageToken := "" - for { - req := reader_v2.GrantsReaderServiceListGrantsForEntitlementRequest_builder{ - Entitlement: descendantEntitlement.GetEntitlement(), - PrincipalId: sourceGrant.GetPrincipal().GetId(), - PageToken: pageToken, - Annotations: nil, - }.Build() + principal := sourceGrant.GetPrincipal().GetId() + key := descendantGrantKey(principal.GetResourceType(), principal.GetResource()) + descendantGrants := descendantByPrincipal[key] - resp, err := e.store.ListGrantsForEntitlement(ctx, req) + if len(descendantGrants) == 0 { + descendantGrant, err := newExpandedGrant(descendantEntitlement.GetEntitlement(), sourceGrant.GetPrincipal(), action.SourceEntitlementID, isSourceDirect) if err != nil { - l.Error("runAction: error fetching descendant grants", zap.Error(err)) - return "", fmt.Errorf("runAction: error fetching descendant grants: %w", err) + l.Error("runAction: error creating new grant", zap.Error(err)) + return "", fmt.Errorf("runAction: error creating new grant: %w", err) } - descendantGrants := resp.GetList() - - // If we have no grants for the principal in the descendant entitlement, make one. - if pageToken == "" && resp.GetNextPageToken() == "" && len(descendantGrants) == 0 { - descendantGrant, err := newExpandedGrant(descendantEntitlement.GetEntitlement(), sourceGrant.GetPrincipal(), action.SourceEntitlementID, isSourceDirect) - if err != nil { - l.Error("runAction: error creating new grant", zap.Error(err)) - return "", fmt.Errorf("runAction: error creating new grant: %w", err) - } - newGrants = append(newGrants, descendantGrant) - newGrants, err = PutGrantsInChunks(ctx, e.store, newGrants, 10000) - if err != nil { - l.Error("runAction: error updating descendant grants", zap.Error(err)) - return "", fmt.Errorf("runAction: error updating descendant grants: %w", err) - } - break - } - - // Add the source entitlement as a source to all descendant grants. - grantsToUpdate := make([]*v2.Grant, 0) + newGrants = append(newGrants, descendantGrant) + } else { for _, descendantGrant := range descendantGrants { sourcesMap := descendantGrant.GetSources().GetSources() if sourcesMap == nil { @@ -274,13 +283,10 @@ func (e *Expander) runAction(ctx context.Context, action *EntitlementGraphAction } updated := false - if len(sourcesMap) == 0 { - // If we are already granted this entitlement, make sure to add ourselves as a source. sourcesMap[action.DescendantEntitlementID] = &v2.GrantSources_GrantSource{IsDirect: true} updated = true } - // Include the source grant as a source. if sourcesMap[action.SourceEntitlementID] == nil { sourcesMap[action.SourceEntitlementID] = &v2.GrantSources_GrantSource{IsDirect: isSourceDirect} updated = true @@ -289,21 +295,15 @@ func (e *Expander) runAction(ctx context.Context, action *EntitlementGraphAction if updated { sources := v2.GrantSources_builder{Sources: sourcesMap}.Build() descendantGrant.SetSources(sources) - grantsToUpdate = append(grantsToUpdate, descendantGrant) + newGrants = append(newGrants, descendantGrant) } } - newGrants = append(newGrants, grantsToUpdate...) - - newGrants, err = PutGrantsInChunks(ctx, e.store, newGrants, 10000) - if err != nil { - l.Error("runAction: error updating descendant grants", zap.Error(err)) - return "", fmt.Errorf("runAction: error updating descendant grants: %w", err) - } + } - pageToken = resp.GetNextPageToken() - if pageToken == "" { - break - } + newGrants, err = PutGrantsInChunks(ctx, e.store, newGrants, 10000) + if err != nil { + l.Error("runAction: error updating descendant grants", zap.Error(err)) + return "", fmt.Errorf("runAction: error updating descendant grants: %w", err) } } From 3627df664f63eb09f8c5ac972a78c070da103fec Mon Sep 17 00:00:00 2001 From: arreyder Date: Sat, 23 May 2026 22:01:51 -0500 Subject: [PATCH 2/5] review: add prefetch cache + context check + page safety valve + nil-principal guard MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses code review findings on PR #863: 1. Cache descendant prefetch across pages of the same action (High): the same descendant entitlement is now fetched once and reused for all source-grant pages within a single action. Cache invalidated when the action completes and moves to the next edge. 2. Context cancellation check in prefetch loop (Medium): ctx.Err() is checked at the top of each page iteration so the loop exits promptly on timeout or shutdown. 3. Page-count safety valve (Critical → defense-in-depth): maxPrefetchPages (10,000) bounds the loop against infinite pagination from a buggy store. At 10K pages × 10K page size = 100M grants, well beyond any realistic entitlement. 4. Nil-principal skip (Low → defensive): source grants with nil principals are skipped rather than producing a key collision on "\x00". Matches newExpandedGrant's nil rejection. Co-Authored-By: Claude Sonnet 4.5 --- pkg/sync/expand/expander.go | 30 +++++++++++++++++++++++++----- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/pkg/sync/expand/expander.go b/pkg/sync/expand/expander.go index ab1905ad4..58ba4c5eb 100644 --- a/pkg/sync/expand/expander.go +++ b/pkg/sync/expand/expander.go @@ -48,6 +48,9 @@ type ExpanderStore interface { type Expander struct { store ExpanderStore graph *EntitlementGraph + + prefetchedDescendantID string + prefetchedDescendants map[string][]*v2.Grant } // NewExpander creates a new Expander with the given store and graph. @@ -107,6 +110,8 @@ func (e *Expander) RunSingleStep(ctx context.Context) error { // Action is complete - mark edge expanded and remove from queue e.graph.MarkEdgeExpanded(action.SourceEntitlementID, action.DescendantEntitlementID) e.graph.Actions = e.graph.Actions[1:] + e.prefetchedDescendantID = "" + e.prefetchedDescendants = nil } } @@ -148,19 +153,28 @@ func (e *Expander) IsDone(ctx context.Context) bool { return e.graph.IsExpanded() } +const maxPrefetchPages = 10000 + func descendantGrantKey(resourceTypeID, resourceID string) string { return resourceTypeID + "\x00" + resourceID } -func prefetchDescendantGrants( +func (e *Expander) getDescendantGrants( ctx context.Context, - store ExpanderStore, entitlement *v2.Entitlement, ) (map[string][]*v2.Grant, error) { + entID := entitlement.GetId() + if e.prefetchedDescendantID == entID && e.prefetchedDescendants != nil { + return e.prefetchedDescendants, nil + } + result := make(map[string][]*v2.Grant) pageToken := "" - for { - resp, err := store.ListGrantsForEntitlement(ctx, + for page := 0; page < maxPrefetchPages; page++ { + if err := ctx.Err(); err != nil { + return nil, err + } + resp, err := e.store.ListGrantsForEntitlement(ctx, reader_v2.GrantsReaderServiceListGrantsForEntitlementRequest_builder{ Entitlement: entitlement, PageToken: pageToken, @@ -178,6 +192,9 @@ func prefetchDescendantGrants( break } } + + e.prefetchedDescendantID = entID + e.prefetchedDescendants = result return result, nil } @@ -241,7 +258,7 @@ func (e *Expander) runAction(ctx context.Context, action *EntitlementGraphAction return "", fmt.Errorf("runAction: error fetching source grants: %w", err) } - descendantByPrincipal, err := prefetchDescendantGrants(ctx, e.store, descendantEntitlement.GetEntitlement()) + descendantByPrincipal, err := e.getDescendantGrants(ctx, descendantEntitlement.GetEntitlement()) if err != nil { l.Error("runAction: error prefetching descendant grants", zap.Error(err)) return "", fmt.Errorf("runAction: error prefetching descendant grants: %w", err) @@ -250,6 +267,9 @@ func (e *Expander) runAction(ctx context.Context, action *EntitlementGraphAction var newGrants = make([]*v2.Grant, 0) for _, sourceGrant := range sourceGrants.GetList() { + if sourceGrant.GetPrincipal() == nil { + continue + } if action.Shallow { sourcesMap := sourceGrant.GetSources().GetSources() foundDirectGrant := len(sourcesMap) == 0 From a90304c7c16ff87fac58bcc104ae20d17f216204 Mon Sep 17 00:00:00 2001 From: arreyder Date: Sun, 24 May 2026 08:22:00 -0500 Subject: [PATCH 3/5] review: remove Expander-level cache; keep per-call prefetch MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses alan-lee-12's review: the syncer creates a new Expander per RunSingleStep (syncer.go:2555), so struct-level cache fields don't persist across source-grant pages in production. The benchmark (expander.Run) was overly optimistic because it reuses one Expander. Remove the prefetchedDescendantID/prefetchedDescendants fields and the cache-invalidation logic. The core optimization is unchanged: each runAction call does ONE bulk scan of the descendant entitlement (prefetchDescendantGrants) instead of N per-principal SQL queries. This is the source of the 70% benchmark win — the cross-page cache was bonus amortization that only helped the Run() path. Production behavior per page: Before: 10K individual point-lookup SQL queries After: 1 bulk scan + map lookups The cross-page staleness concern is also resolved: since there's no cache, each page always sees the latest store state. Co-Authored-By: Claude Sonnet 4.5 --- pkg/sync/expand/expander.go | 20 ++++---------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/pkg/sync/expand/expander.go b/pkg/sync/expand/expander.go index 58ba4c5eb..0801ac2d0 100644 --- a/pkg/sync/expand/expander.go +++ b/pkg/sync/expand/expander.go @@ -48,9 +48,6 @@ type ExpanderStore interface { type Expander struct { store ExpanderStore graph *EntitlementGraph - - prefetchedDescendantID string - prefetchedDescendants map[string][]*v2.Grant } // NewExpander creates a new Expander with the given store and graph. @@ -110,8 +107,6 @@ func (e *Expander) RunSingleStep(ctx context.Context) error { // Action is complete - mark edge expanded and remove from queue e.graph.MarkEdgeExpanded(action.SourceEntitlementID, action.DescendantEntitlementID) e.graph.Actions = e.graph.Actions[1:] - e.prefetchedDescendantID = "" - e.prefetchedDescendants = nil } } @@ -159,22 +154,18 @@ func descendantGrantKey(resourceTypeID, resourceID string) string { return resourceTypeID + "\x00" + resourceID } -func (e *Expander) getDescendantGrants( +func prefetchDescendantGrants( ctx context.Context, + store ExpanderStore, entitlement *v2.Entitlement, ) (map[string][]*v2.Grant, error) { - entID := entitlement.GetId() - if e.prefetchedDescendantID == entID && e.prefetchedDescendants != nil { - return e.prefetchedDescendants, nil - } - result := make(map[string][]*v2.Grant) pageToken := "" for page := 0; page < maxPrefetchPages; page++ { if err := ctx.Err(); err != nil { return nil, err } - resp, err := e.store.ListGrantsForEntitlement(ctx, + resp, err := store.ListGrantsForEntitlement(ctx, reader_v2.GrantsReaderServiceListGrantsForEntitlementRequest_builder{ Entitlement: entitlement, PageToken: pageToken, @@ -192,9 +183,6 @@ func (e *Expander) getDescendantGrants( break } } - - e.prefetchedDescendantID = entID - e.prefetchedDescendants = result return result, nil } @@ -258,7 +246,7 @@ func (e *Expander) runAction(ctx context.Context, action *EntitlementGraphAction return "", fmt.Errorf("runAction: error fetching source grants: %w", err) } - descendantByPrincipal, err := e.getDescendantGrants(ctx, descendantEntitlement.GetEntitlement()) + descendantByPrincipal, err := prefetchDescendantGrants(ctx, e.store, descendantEntitlement.GetEntitlement()) if err != nil { l.Error("runAction: error prefetching descendant grants", zap.Error(err)) return "", fmt.Errorf("runAction: error prefetching descendant grants: %w", err) From cb8a2b6338dc21728954afa704245dec29d83094 Mon Sep 17 00:00:00 2001 From: arreyder Date: Tue, 26 May 2026 13:34:34 -0500 Subject: [PATCH 4/5] review: return ErrPrefetchPageCapExceeded instead of a partial map MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit kans flagged "what happens if we hit maxPrefetchPages?" Tracing the caller, a truncated map would cause runAction to see the missing principals as "no existing descendant grant" and call newExpandedGrant instead of merging into the existing grant's sources map — producing duplicate grants with wrong source attribution. Return ErrPrefetchPageCapExceeded so the caller surfaces the failure instead of silently corrupting expansion output. Add a covering test that drives a stub store which always returns a non-empty NextPageToken. --- pkg/sync/expand/expander.go | 12 +++++++++-- pkg/sync/expand/expander_test.go | 34 ++++++++++++++++++++++++++++++++ 2 files changed, 44 insertions(+), 2 deletions(-) diff --git a/pkg/sync/expand/expander.go b/pkg/sync/expand/expander.go index 0801ac2d0..7e0930d84 100644 --- a/pkg/sync/expand/expander.go +++ b/pkg/sync/expand/expander.go @@ -150,6 +150,13 @@ func (e *Expander) IsDone(ctx context.Context) bool { const maxPrefetchPages = 10000 +// ErrPrefetchPageCapExceeded means the prefetch hit maxPrefetchPages without +// reaching the last page. Returning a partial map is unsafe — runAction would +// see the missing principals as "no existing descendant grant" and emit a +// fresh grant instead of merging sources, producing duplicate grants with +// wrong source attribution. Caller should surface this rather than continue. +var ErrPrefetchPageCapExceeded = errors.New("descendant-grant prefetch exceeded maxPrefetchPages cap") + func descendantGrantKey(resourceTypeID, resourceID string) string { return resourceTypeID + "\x00" + resourceID } @@ -180,10 +187,11 @@ func prefetchDescendantGrants( } pageToken = resp.GetNextPageToken() if pageToken == "" { - break + return result, nil } } - return result, nil + return nil, fmt.Errorf("prefetchDescendantGrants: %w (entitlement=%s, pages=%d)", + ErrPrefetchPageCapExceeded, entitlement.GetId(), maxPrefetchPages) } // runAction processes a single action and returns the next page token. diff --git a/pkg/sync/expand/expander_test.go b/pkg/sync/expand/expander_test.go index 3c0b508a1..5000aaaa4 100644 --- a/pkg/sync/expand/expander_test.go +++ b/pkg/sync/expand/expander_test.go @@ -454,3 +454,37 @@ func TestExpanderMixedDirectness(t *testing.T) { require.Contains(t, sourcesC, entB.GetId()) require.False(t, sourcesC[entB.GetId()].GetIsDirect(), "source B should be transitive") } + +// pageCapStubStore always returns a non-empty NextPageToken so the prefetch +// loop never terminates naturally. Used to exercise the maxPrefetchPages cap. +type pageCapStubStore struct { + MockExpanderStore + calls int +} + +func (s *pageCapStubStore) ListGrantsForEntitlement( + _ context.Context, + _ *reader_v2.GrantsReaderServiceListGrantsForEntitlementRequest, +) (*reader_v2.GrantsReaderServiceListGrantsForEntitlementResponse, error) { + s.calls++ + return reader_v2.GrantsReaderServiceListGrantsForEntitlementResponse_builder{ + List: []*v2.Grant{}, + NextPageToken: "more", + }.Build(), nil +} + +// TestPrefetchDescendantGrants_PageCapExceeded locks in that the prefetch +// errors instead of returning a partial map when maxPrefetchPages is hit. +// A silently truncated map would cause runAction to emit duplicate grants +// with wrong source attribution for any principal whose grants landed on a +// dropped page (see comment on ErrPrefetchPageCapExceeded). +func TestPrefetchDescendantGrants_PageCapExceeded(t *testing.T) { + store := &pageCapStubStore{MockExpanderStore: *NewMockExpanderStore()} + ent := v2.Entitlement_builder{Id: "entitlement:1"}.Build() + + result, err := prefetchDescendantGrants(context.Background(), store, ent) + require.Nil(t, result) + require.Error(t, err) + require.ErrorIs(t, err, ErrPrefetchPageCapExceeded) + require.Equal(t, maxPrefetchPages, store.calls, "loop must iterate exactly the cap before erroring") +} From 2b6f25f2a77c265bb30881156149e59b76bf1233 Mon Sep 17 00:00:00 2001 From: arreyder Date: Wed, 27 May 2026 08:22:01 -0500 Subject: [PATCH 5/5] review: bound prefetch memory, dedupe in-page same-principal grants MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replaces the full-grant prefetch with a principal-key set so memory scales with the number of distinct descendant principals (~80B/key) instead of full grant payloads (~2KB each). When a source grant's principal is not in the set, runAction skips the per-principal SQL query and creates a new grant directly — preserving the fast path for the Lilly scenario where the descendant has no existing grants. When the principal is in the set, runAction issues one per-principal query to fetch and merge. Other review fixes: - Lower maxPrefetchPages from 10000 → 100; on cap exceeded, log a warning and fall back to per-principal queries (no error). Very large c1zs now run slowly instead of OOMing or failing outright. - Seed the per-page descendant cache with newly created grants so two same-principal source grants in one page merge into one expanded grant instead of producing a duplicate (same deterministic grant ID would otherwise upsert and lose source attribution). Addresses review comments on #863. Co-Authored-By: Claude Opus 4.7 (1M context) --- pkg/sync/expand/expander.go | 131 +++++++++++++++++++++++++------ pkg/sync/expand/expander_test.go | 76 +++++++++++++++--- 2 files changed, 172 insertions(+), 35 deletions(-) diff --git a/pkg/sync/expand/expander.go b/pkg/sync/expand/expander.go index 7e0930d84..39b6047ef 100644 --- a/pkg/sync/expand/expander.go +++ b/pkg/sync/expand/expander.go @@ -148,29 +148,39 @@ func (e *Expander) IsDone(ctx context.Context) bool { return e.graph.IsExpanded() } -const maxPrefetchPages = 10000 - -// ErrPrefetchPageCapExceeded means the prefetch hit maxPrefetchPages without -// reaching the last page. Returning a partial map is unsafe — runAction would -// see the missing principals as "no existing descendant grant" and emit a -// fresh grant instead of merging sources, producing duplicate grants with -// wrong source attribution. Caller should surface this rather than continue. -var ErrPrefetchPageCapExceeded = errors.New("descendant-grant prefetch exceeded maxPrefetchPages cap") +// maxPrefetchPages bounds the streaming scan that builds the principal-key +// set for the descendant entitlement. Keys are ~80 bytes each, so 100 pages +// of 10000 rows is a ~80MB ceiling per action. Exceeding the cap is not an +// error — the caller drops the partial set and falls back to per-principal +// queries (slower, but bounded and correct). +const maxPrefetchPages = 100 func descendantGrantKey(resourceTypeID, resourceID string) string { return resourceTypeID + "\x00" + resourceID } -func prefetchDescendantGrants( +// prefetchDescendantPrincipals streams the descendant entitlement's grants +// and returns the set of principal keys that have at least one grant. We +// keep only keys (not full grants) so memory scales with the number of +// distinct principals on the descendant, not the total grant payload size. +// +// The bool return reports whether the scan completed: +// - true: the set is exhaustive; absence from the set means the descendant +// definitely has no grant for that principal, so runAction can skip the +// per-principal query and create a new grant directly. +// - false: the page cap was exceeded. The partial set is unsafe as a +// negative oracle — a missing key could be a true absence or just past +// the cap — so runAction must fall back to per-principal queries. +func prefetchDescendantPrincipals( ctx context.Context, store ExpanderStore, entitlement *v2.Entitlement, -) (map[string][]*v2.Grant, error) { - result := make(map[string][]*v2.Grant) +) (map[string]struct{}, bool, error) { + set := make(map[string]struct{}) pageToken := "" for page := 0; page < maxPrefetchPages; page++ { if err := ctx.Err(); err != nil { - return nil, err + return nil, false, err } resp, err := store.ListGrantsForEntitlement(ctx, reader_v2.GrantsReaderServiceListGrantsForEntitlementRequest_builder{ @@ -178,20 +188,51 @@ func prefetchDescendantGrants( PageToken: pageToken, }.Build()) if err != nil { - return nil, fmt.Errorf("prefetchDescendantGrants: %w", err) + return nil, false, fmt.Errorf("prefetchDescendantPrincipals: %w", err) } for _, g := range resp.GetList() { - principal := g.GetPrincipal().GetId() - key := descendantGrantKey(principal.GetResourceType(), principal.GetResource()) - result[key] = append(result[key], g) + pid := g.GetPrincipal().GetId() + set[descendantGrantKey(pid.GetResourceType(), pid.GetResource())] = struct{}{} } pageToken = resp.GetNextPageToken() if pageToken == "" { - return result, nil + return set, true, nil + } + } + return set, false, nil +} + +// listDescendantGrantsForPrincipal pages through ListGrantsForEntitlement +// scoped to a single principal. Used by runAction when the key set says +// (or doesn't know whether) the descendant has grants for this principal, +// so we need the full grants to merge sources. +func listDescendantGrantsForPrincipal( + ctx context.Context, + store ExpanderStore, + entitlement *v2.Entitlement, + principalID *v2.ResourceId, +) ([]*v2.Grant, error) { + var out []*v2.Grant + pageToken := "" + for { + if err := ctx.Err(); err != nil { + return nil, err + } + resp, err := store.ListGrantsForEntitlement(ctx, + reader_v2.GrantsReaderServiceListGrantsForEntitlementRequest_builder{ + Entitlement: entitlement, + PrincipalId: principalID, + PageToken: pageToken, + }.Build()) + if err != nil { + return nil, fmt.Errorf("listDescendantGrantsForPrincipal: %w", err) + } + out = append(out, resp.GetList()...) + pageToken = resp.GetNextPageToken() + if pageToken == "" { + return out, nil } } - return nil, fmt.Errorf("prefetchDescendantGrants: %w (entitlement=%s, pages=%d)", - ErrPrefetchPageCapExceeded, entitlement.GetId(), maxPrefetchPages) } // runAction processes a single action and returns the next page token. @@ -254,12 +295,31 @@ func (e *Expander) runAction(ctx context.Context, action *EntitlementGraphAction return "", fmt.Errorf("runAction: error fetching source grants: %w", err) } - descendantByPrincipal, err := prefetchDescendantGrants(ctx, e.store, descendantEntitlement.GetEntitlement()) + existingPrincipals, complete, err := prefetchDescendantPrincipals(ctx, e.store, descendantEntitlement.GetEntitlement()) if err != nil { - l.Error("runAction: error prefetching descendant grants", zap.Error(err)) - return "", fmt.Errorf("runAction: error prefetching descendant grants: %w", err) + l.Error("runAction: error prefetching descendant principals", zap.Error(err)) + return "", fmt.Errorf("runAction: error prefetching descendant principals: %w", err) + } + if !complete { + // Partial set is unsafe as a negative oracle; drop it and pay the + // per-principal cost for every source grant. Slow but bounded. + l.Warn("runAction: descendant-grant prefetch cap exceeded; falling back to per-principal queries", + zap.String("descendant_entitlement_id", action.DescendantEntitlementID), + zap.Int("prefetch_page_cap", maxPrefetchPages)) + existingPrincipals = nil } - span.SetAttributes(attribute.Int("descendant_prefetch_count", len(descendantByPrincipal))) + span.SetAttributes( + attribute.Int("descendant_prefetch_set_size", len(existingPrincipals)), + attribute.Bool("descendant_prefetch_complete", complete), + ) + + // Per-page cache of descendant grants by principal key. Populated lazily + // from the store on the first hit for a key, and seeded with newly + // created grants so a second source grant for the same principal in the + // same page merges into the in-memory grant instead of producing a + // duplicate (same deterministic grant ID would otherwise upsert and lose + // the first iteration's source attribution). + descendantsByKey := make(map[string][]*v2.Grant) var newGrants = make([]*v2.Grant, 0) for _, sourceGrant := range sourceGrants.GetList() { @@ -282,7 +342,27 @@ func (e *Expander) runAction(ctx context.Context, action *EntitlementGraphAction principal := sourceGrant.GetPrincipal().GetId() key := descendantGrantKey(principal.GetResourceType(), principal.GetResource()) - descendantGrants := descendantByPrincipal[key] + + descendantGrants, cached := descendantsByKey[key] + if !cached { + // Fast path: if the key-set is complete and this principal is + // not in it, we know there is no existing descendant grant + // without issuing a SQL query. + needQuery := true + if existingPrincipals != nil { + if _, ok := existingPrincipals[key]; !ok { + needQuery = false + } + } + if needQuery { + descendantGrants, err = listDescendantGrantsForPrincipal(ctx, e.store, descendantEntitlement.GetEntitlement(), principal) + if err != nil { + l.Error("runAction: error fetching descendant grants", zap.Error(err)) + return "", fmt.Errorf("runAction: error fetching descendant grants: %w", err) + } + } + descendantsByKey[key] = descendantGrants + } if len(descendantGrants) == 0 { descendantGrant, err := newExpandedGrant(descendantEntitlement.GetEntitlement(), sourceGrant.GetPrincipal(), action.SourceEntitlementID, isSourceDirect) @@ -291,6 +371,9 @@ func (e *Expander) runAction(ctx context.Context, action *EntitlementGraphAction return "", fmt.Errorf("runAction: error creating new grant: %w", err) } newGrants = append(newGrants, descendantGrant) + // Seed the cache so a later source grant for the same principal + // in this page merges into this grant rather than re-creating it. + descendantsByKey[key] = []*v2.Grant{descendantGrant} } else { for _, descendantGrant := range descendantGrants { sourcesMap := descendantGrant.GetSources().GetSources() diff --git a/pkg/sync/expand/expander_test.go b/pkg/sync/expand/expander_test.go index 5000aaaa4..171ca50c6 100644 --- a/pkg/sync/expand/expander_test.go +++ b/pkg/sync/expand/expander_test.go @@ -473,18 +473,72 @@ func (s *pageCapStubStore) ListGrantsForEntitlement( }.Build(), nil } -// TestPrefetchDescendantGrants_PageCapExceeded locks in that the prefetch -// errors instead of returning a partial map when maxPrefetchPages is hit. -// A silently truncated map would cause runAction to emit duplicate grants -// with wrong source attribution for any principal whose grants landed on a -// dropped page (see comment on ErrPrefetchPageCapExceeded). -func TestPrefetchDescendantGrants_PageCapExceeded(t *testing.T) { +// TestPrefetchDescendantPrincipals_PageCapExceeded locks in graceful +// degradation when the cap is hit: the helper must return complete=false +// (and no error) so runAction can drop the partial set and fall back to +// per-principal queries. Returning an error here would make the expander +// fail on very large descendant entitlements instead of just running slowly. +func TestPrefetchDescendantPrincipals_PageCapExceeded(t *testing.T) { store := &pageCapStubStore{MockExpanderStore: *NewMockExpanderStore()} ent := v2.Entitlement_builder{Id: "entitlement:1"}.Build() - result, err := prefetchDescendantGrants(context.Background(), store, ent) - require.Nil(t, result) - require.Error(t, err) - require.ErrorIs(t, err, ErrPrefetchPageCapExceeded) - require.Equal(t, maxPrefetchPages, store.calls, "loop must iterate exactly the cap before erroring") + set, complete, err := prefetchDescendantPrincipals(context.Background(), store, ent) + require.NoError(t, err) + require.False(t, complete, "cap exceeded must report incomplete so caller falls back") + require.NotNil(t, set) + require.Equal(t, maxPrefetchPages, store.calls, "loop must iterate exactly the cap before giving up") +} + +// TestExpanderInPageDuplicatePrincipal covers the case where multiple source +// grants in the same source-grants page share the same principal and the +// descendant entitlement has no pre-existing grant. The expander must emit +// exactly one expanded grant per (descendant, principal) — not one per +// source grant — because the deterministic grant ID would otherwise cause +// upsert collisions and lose source attribution. +func TestExpanderInPageDuplicatePrincipal(t *testing.T) { + ctx := context.Background() + store := NewMockExpanderStore() + + groupResource := makeResource("group", "team") + userResource := makeResource("user", "alice") + + entA := makeEntitlement("ent:a", groupResource) + entB := makeEntitlement("ent:b", groupResource) + + store.AddEntitlement(entA) + store.AddEntitlement(entB) + + // Two source grants on entA for the same principal (same ResourceId). + // This can happen when a connector emits multiple membership grants for + // the same user (e.g. nested-group membership traversal). + store.AddGrant(makeGrant("grant:alice:a:1", entA, userResource)) + store.AddGrant(makeGrant("grant:alice:a:2", entA, userResource)) + + graph := NewEntitlementGraph(ctx) + graph.AddEntitlementID(entA.GetId()) + graph.AddEntitlementID(entB.GetId()) + require.NoError(t, graph.AddEdge(ctx, entA.GetId(), entB.GetId(), false, []string{"user"})) + + expander := NewExpander(store, graph) + require.NoError(t, expander.Run(ctx)) + + // Count distinct expanded grants on entB for alice. + distinct := make(map[string]*v2.Grant) + for _, g := range store.GetPutGrants() { + if g.GetEntitlement().GetId() != entB.GetId() { + continue + } + if g.GetPrincipal().GetId().GetResource() != "alice" { + continue + } + distinct[g.GetId()] = g + } + require.Len(t, distinct, 1, "two same-principal source grants must produce exactly one expanded grant") + + var only *v2.Grant + for _, g := range distinct { + only = g + } + sources := only.GetSources().GetSources() + require.Contains(t, sources, entA.GetId(), "source attribution must be preserved") }