diff --git a/pkg/sync/expand/expander.go b/pkg/sync/expand/expander.go index 2018a3f28..7e0930d84 100644 --- a/pkg/sync/expand/expander.go +++ b/pkg/sync/expand/expander.go @@ -148,6 +148,52 @@ func (e *Expander) IsDone(ctx context.Context) bool { return e.graph.IsExpanded() } +const maxPrefetchPages = 10000 + +// ErrPrefetchPageCapExceeded means the prefetch hit maxPrefetchPages without +// reaching the last page. Returning a partial map is unsafe — runAction would +// see the missing principals as "no existing descendant grant" and emit a +// fresh grant instead of merging sources, producing duplicate grants with +// wrong source attribution. Caller should surface this rather than continue. +var ErrPrefetchPageCapExceeded = errors.New("descendant-grant prefetch exceeded maxPrefetchPages cap") + +func descendantGrantKey(resourceTypeID, resourceID string) string { + return resourceTypeID + "\x00" + resourceID +} + +func prefetchDescendantGrants( + ctx context.Context, + store ExpanderStore, + entitlement *v2.Entitlement, +) (map[string][]*v2.Grant, error) { + result := make(map[string][]*v2.Grant) + pageToken := "" + for page := 0; page < maxPrefetchPages; page++ { + if err := ctx.Err(); err != nil { + return nil, err + } + resp, err := store.ListGrantsForEntitlement(ctx, + reader_v2.GrantsReaderServiceListGrantsForEntitlementRequest_builder{ + Entitlement: entitlement, + PageToken: pageToken, + }.Build()) + if err != nil { + return nil, fmt.Errorf("prefetchDescendantGrants: %w", err) + } + for _, g := range resp.GetList() { + principal := g.GetPrincipal().GetId() + key := descendantGrantKey(principal.GetResourceType(), principal.GetResource()) + result[key] = append(result[key], g) + } + pageToken = resp.GetNextPageToken() + if pageToken == "" { + return result, nil + } + } + return nil, fmt.Errorf("prefetchDescendantGrants: %w (entitlement=%s, pages=%d)", + ErrPrefetchPageCapExceeded, entitlement.GetId(), maxPrefetchPages) +} + // runAction processes a single action and returns the next page token. // If the returned page token is empty, the action is complete. // @@ -208,65 +254,44 @@ func (e *Expander) runAction(ctx context.Context, action *EntitlementGraphAction return "", fmt.Errorf("runAction: error fetching source grants: %w", err) } + descendantByPrincipal, err := prefetchDescendantGrants(ctx, e.store, descendantEntitlement.GetEntitlement()) + if err != nil { + l.Error("runAction: error prefetching descendant grants", zap.Error(err)) + return "", fmt.Errorf("runAction: error prefetching descendant grants: %w", err) + } + span.SetAttributes(attribute.Int("descendant_prefetch_count", len(descendantByPrincipal))) + var newGrants = make([]*v2.Grant, 0) for _, sourceGrant := range sourceGrants.GetList() { - // If this is a shallow action, then we only want to expand grants that have no sources - // which indicates that it was directly assigned. + if sourceGrant.GetPrincipal() == nil { + continue + } if action.Shallow { sourcesMap := sourceGrant.GetSources().GetSources() - // If we have no sources, this is a direct grant foundDirectGrant := len(sourcesMap) == 0 - // If the source grant has sources, then we need to see if any of them are the source entitlement itself if sourcesMap[action.SourceEntitlementID] != nil { foundDirectGrant = true } - - // This is not a direct grant, so skip it since we are a shallow action if !foundDirectGrant { continue } } - // Determine if the source grant is direct: either it has no sources (never expanded), - // or it has a self-reference (direct grant that was also expanded). sgSources := sourceGrant.GetSources().GetSources() isSourceDirect := len(sgSources) == 0 || sgSources[action.SourceEntitlementID] != nil - // Unroll all grants for the principal on the descendant entitlement. - pageToken := "" - for { - req := reader_v2.GrantsReaderServiceListGrantsForEntitlementRequest_builder{ - Entitlement: descendantEntitlement.GetEntitlement(), - PrincipalId: sourceGrant.GetPrincipal().GetId(), - PageToken: pageToken, - Annotations: nil, - }.Build() + principal := sourceGrant.GetPrincipal().GetId() + key := descendantGrantKey(principal.GetResourceType(), principal.GetResource()) + descendantGrants := descendantByPrincipal[key] - resp, err := e.store.ListGrantsForEntitlement(ctx, req) + if len(descendantGrants) == 0 { + descendantGrant, err := newExpandedGrant(descendantEntitlement.GetEntitlement(), sourceGrant.GetPrincipal(), action.SourceEntitlementID, isSourceDirect) if err != nil { - l.Error("runAction: error fetching descendant grants", zap.Error(err)) - return "", fmt.Errorf("runAction: error fetching descendant grants: %w", err) - } - descendantGrants := resp.GetList() - - // If we have no grants for the principal in the descendant entitlement, make one. - if pageToken == "" && resp.GetNextPageToken() == "" && len(descendantGrants) == 0 { - descendantGrant, err := newExpandedGrant(descendantEntitlement.GetEntitlement(), sourceGrant.GetPrincipal(), action.SourceEntitlementID, isSourceDirect) - if err != nil { - l.Error("runAction: error creating new grant", zap.Error(err)) - return "", fmt.Errorf("runAction: error creating new grant: %w", err) - } - newGrants = append(newGrants, descendantGrant) - newGrants, err = PutGrantsInChunks(ctx, e.store, newGrants, 10000) - if err != nil { - l.Error("runAction: error updating descendant grants", zap.Error(err)) - return "", fmt.Errorf("runAction: error updating descendant grants: %w", err) - } - break + l.Error("runAction: error creating new grant", zap.Error(err)) + return "", fmt.Errorf("runAction: error creating new grant: %w", err) } - - // Add the source entitlement as a source to all descendant grants. - grantsToUpdate := make([]*v2.Grant, 0) + newGrants = append(newGrants, descendantGrant) + } else { for _, descendantGrant := range descendantGrants { sourcesMap := descendantGrant.GetSources().GetSources() if sourcesMap == nil { @@ -274,13 +299,10 @@ func (e *Expander) runAction(ctx context.Context, action *EntitlementGraphAction } updated := false - if len(sourcesMap) == 0 { - // If we are already granted this entitlement, make sure to add ourselves as a source. sourcesMap[action.DescendantEntitlementID] = &v2.GrantSources_GrantSource{IsDirect: true} updated = true } - // Include the source grant as a source. if sourcesMap[action.SourceEntitlementID] == nil { sourcesMap[action.SourceEntitlementID] = &v2.GrantSources_GrantSource{IsDirect: isSourceDirect} updated = true @@ -289,21 +311,15 @@ func (e *Expander) runAction(ctx context.Context, action *EntitlementGraphAction if updated { sources := v2.GrantSources_builder{Sources: sourcesMap}.Build() descendantGrant.SetSources(sources) - grantsToUpdate = append(grantsToUpdate, descendantGrant) + newGrants = append(newGrants, descendantGrant) } } - newGrants = append(newGrants, grantsToUpdate...) - - newGrants, err = PutGrantsInChunks(ctx, e.store, newGrants, 10000) - if err != nil { - l.Error("runAction: error updating descendant grants", zap.Error(err)) - return "", fmt.Errorf("runAction: error updating descendant grants: %w", err) - } + } - pageToken = resp.GetNextPageToken() - if pageToken == "" { - break - } + newGrants, err = PutGrantsInChunks(ctx, e.store, newGrants, 10000) + if err != nil { + l.Error("runAction: error updating descendant grants", zap.Error(err)) + return "", fmt.Errorf("runAction: error updating descendant grants: %w", err) } } diff --git a/pkg/sync/expand/expander_test.go b/pkg/sync/expand/expander_test.go index 3c0b508a1..5000aaaa4 100644 --- a/pkg/sync/expand/expander_test.go +++ b/pkg/sync/expand/expander_test.go @@ -454,3 +454,37 @@ func TestExpanderMixedDirectness(t *testing.T) { require.Contains(t, sourcesC, entB.GetId()) require.False(t, sourcesC[entB.GetId()].GetIsDirect(), "source B should be transitive") } + +// pageCapStubStore always returns a non-empty NextPageToken so the prefetch +// loop never terminates naturally. Used to exercise the maxPrefetchPages cap. +type pageCapStubStore struct { + MockExpanderStore + calls int +} + +func (s *pageCapStubStore) ListGrantsForEntitlement( + _ context.Context, + _ *reader_v2.GrantsReaderServiceListGrantsForEntitlementRequest, +) (*reader_v2.GrantsReaderServiceListGrantsForEntitlementResponse, error) { + s.calls++ + return reader_v2.GrantsReaderServiceListGrantsForEntitlementResponse_builder{ + List: []*v2.Grant{}, + NextPageToken: "more", + }.Build(), nil +} + +// TestPrefetchDescendantGrants_PageCapExceeded locks in that the prefetch +// errors instead of returning a partial map when maxPrefetchPages is hit. +// A silently truncated map would cause runAction to emit duplicate grants +// with wrong source attribution for any principal whose grants landed on a +// dropped page (see comment on ErrPrefetchPageCapExceeded). +func TestPrefetchDescendantGrants_PageCapExceeded(t *testing.T) { + store := &pageCapStubStore{MockExpanderStore: *NewMockExpanderStore()} + ent := v2.Entitlement_builder{Id: "entitlement:1"}.Build() + + result, err := prefetchDescendantGrants(context.Background(), store, ent) + require.Nil(t, result) + require.Error(t, err) + require.ErrorIs(t, err, ErrPrefetchPageCapExceeded) + require.Equal(t, maxPrefetchPages, store.calls, "loop must iterate exactly the cap before erroring") +}