Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 63 additions & 55 deletions pkg/sync/expand/expander.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,44 @@ func (e *Expander) IsDone(ctx context.Context) bool {
return e.graph.IsExpanded()
}

const maxPrefetchPages = 10000

func descendantGrantKey(resourceTypeID, resourceID string) string {
return resourceTypeID + "\x00" + resourceID
}

func prefetchDescendantGrants(
ctx context.Context,
store ExpanderStore,
entitlement *v2.Entitlement,
) (map[string][]*v2.Grant, error) {
result := make(map[string][]*v2.Grant)
pageToken := ""
for page := 0; page < maxPrefetchPages; page++ {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if we hit maxPrefetchPages?

if err := ctx.Err(); err != nil {
return nil, err
}
resp, err := store.ListGrantsForEntitlement(ctx,
reader_v2.GrantsReaderServiceListGrantsForEntitlementRequest_builder{
Entitlement: entitlement,
PageToken: pageToken,
}.Build())
if err != nil {
return nil, fmt.Errorf("prefetchDescendantGrants: %w", err)
}
for _, g := range resp.GetList() {
principal := g.GetPrincipal().GetId()
key := descendantGrantKey(principal.GetResourceType(), principal.GetResource())
result[key] = append(result[key], g)
}
pageToken = resp.GetNextPageToken()
if pageToken == "" {
break
}
}
return result, nil
}

// runAction processes a single action and returns the next page token.
// If the returned page token is empty, the action is complete.
//
Expand Down Expand Up @@ -208,79 +246,55 @@ func (e *Expander) runAction(ctx context.Context, action *EntitlementGraphAction
return "", fmt.Errorf("runAction: error fetching source grants: %w", err)
}

descendantByPrincipal, err := prefetchDescendantGrants(ctx, e.store, descendantEntitlement.GetEntitlement())
if err != nil {
l.Error("runAction: error prefetching descendant grants", zap.Error(err))
return "", fmt.Errorf("runAction: error prefetching descendant grants: %w", err)
}
span.SetAttributes(attribute.Int("descendant_prefetch_count", len(descendantByPrincipal)))

var newGrants = make([]*v2.Grant, 0)
for _, sourceGrant := range sourceGrants.GetList() {
// If this is a shallow action, then we only want to expand grants that have no sources
// which indicates that it was directly assigned.
if sourceGrant.GetPrincipal() == nil {
continue
}
if action.Shallow {
sourcesMap := sourceGrant.GetSources().GetSources()
// If we have no sources, this is a direct grant
foundDirectGrant := len(sourcesMap) == 0
// If the source grant has sources, then we need to see if any of them are the source entitlement itself
if sourcesMap[action.SourceEntitlementID] != nil {
foundDirectGrant = true
}

// This is not a direct grant, so skip it since we are a shallow action
if !foundDirectGrant {
continue
}
}

// Determine if the source grant is direct: either it has no sources (never expanded),
// or it has a self-reference (direct grant that was also expanded).
sgSources := sourceGrant.GetSources().GetSources()
isSourceDirect := len(sgSources) == 0 || sgSources[action.SourceEntitlementID] != nil

// Unroll all grants for the principal on the descendant entitlement.
pageToken := ""
for {
req := reader_v2.GrantsReaderServiceListGrantsForEntitlementRequest_builder{
Entitlement: descendantEntitlement.GetEntitlement(),
PrincipalId: sourceGrant.GetPrincipal().GetId(),
PageToken: pageToken,
Annotations: nil,
}.Build()
principal := sourceGrant.GetPrincipal().GetId()
key := descendantGrantKey(principal.GetResourceType(), principal.GetResource())
descendantGrants := descendantByPrincipal[key]

resp, err := e.store.ListGrantsForEntitlement(ctx, req)
if len(descendantGrants) == 0 {
descendantGrant, err := newExpandedGrant(descendantEntitlement.GetEntitlement(), sourceGrant.GetPrincipal(), action.SourceEntitlementID, isSourceDirect)
if err != nil {
l.Error("runAction: error fetching descendant grants", zap.Error(err))
return "", fmt.Errorf("runAction: error fetching descendant grants: %w", err)
}
descendantGrants := resp.GetList()

// If we have no grants for the principal in the descendant entitlement, make one.
if pageToken == "" && resp.GetNextPageToken() == "" && len(descendantGrants) == 0 {
descendantGrant, err := newExpandedGrant(descendantEntitlement.GetEntitlement(), sourceGrant.GetPrincipal(), action.SourceEntitlementID, isSourceDirect)
if err != nil {
l.Error("runAction: error creating new grant", zap.Error(err))
return "", fmt.Errorf("runAction: error creating new grant: %w", err)
}
newGrants = append(newGrants, descendantGrant)
newGrants, err = PutGrantsInChunks(ctx, e.store, newGrants, 10000)
if err != nil {
l.Error("runAction: error updating descendant grants", zap.Error(err))
return "", fmt.Errorf("runAction: error updating descendant grants: %w", err)
}
break
l.Error("runAction: error creating new grant", zap.Error(err))
return "", fmt.Errorf("runAction: error creating new grant: %w", err)
}

// Add the source entitlement as a source to all descendant grants.
grantsToUpdate := make([]*v2.Grant, 0)
newGrants = append(newGrants, descendantGrant)
Comment on lines +277 to +285
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Suggestion: The descendantByPrincipal map is built once before the loop but never updated after creating new grants. If two source grants in the same page share the same principal and no pre-existing descendant grant exists, the second iteration will also see len(descendantGrants) == 0 and create a duplicate grant (same deterministic ID) instead of updating the first grant's sources. Consider inserting the newly created grant into the map:

Suggested change
descendantGrants := descendantByPrincipal[key]
resp, err := e.store.ListGrantsForEntitlement(ctx, req)
if len(descendantGrants) == 0 {
descendantGrant, err := newExpandedGrant(descendantEntitlement.GetEntitlement(), sourceGrant.GetPrincipal(), action.SourceEntitlementID, isSourceDirect)
if err != nil {
l.Error("runAction: error fetching descendant grants", zap.Error(err))
return "", fmt.Errorf("runAction: error fetching descendant grants: %w", err)
l.Error("runAction: error creating new grant", zap.Error(err))
return "", fmt.Errorf("runAction: error creating new grant: %w", err)
}
descendantGrants := resp.GetList()
// If we have no grants for the principal in the descendant entitlement, make one.
if pageToken == "" && resp.GetNextPageToken() == "" && len(descendantGrants) == 0 {
descendantGrant, err := newExpandedGrant(descendantEntitlement.GetEntitlement(), sourceGrant.GetPrincipal(), action.SourceEntitlementID, isSourceDirect)
if err != nil {
l.Error("runAction: error creating new grant", zap.Error(err))
return "", fmt.Errorf("runAction: error creating new grant: %w", err)
}
newGrants = append(newGrants, descendantGrant)
newGrants, err = PutGrantsInChunks(ctx, e.store, newGrants, 10000)
if err != nil {
l.Error("runAction: error updating descendant grants", zap.Error(err))
return "", fmt.Errorf("runAction: error updating descendant grants: %w", err)
}
break
}
// Add the source entitlement as a source to all descendant grants.
grantsToUpdate := make([]*v2.Grant, 0)
newGrants = append(newGrants, descendantGrant)
descendantGrant, err := newExpandedGrant(descendantEntitlement.GetEntitlement(), sourceGrant.GetPrincipal(), action.SourceEntitlementID, isSourceDirect)
if err != nil {
l.Error("runAction: error creating new grant", zap.Error(err))
return "", fmt.Errorf("runAction: error creating new grant: %w", err)
}
newGrants = append(newGrants, descendantGrant)
descendantByPrincipal[key] = append(descendantByPrincipal[key], descendantGrant)

} else {
for _, descendantGrant := range descendantGrants {
sourcesMap := descendantGrant.GetSources().GetSources()
if sourcesMap == nil {
sourcesMap = make(map[string]*v2.GrantSources_GrantSource)
}

updated := false

if len(sourcesMap) == 0 {
// If we are already granted this entitlement, make sure to add ourselves as a source.
sourcesMap[action.DescendantEntitlementID] = &v2.GrantSources_GrantSource{IsDirect: true}
updated = true
}
// Include the source grant as a source.
if sourcesMap[action.SourceEntitlementID] == nil {
sourcesMap[action.SourceEntitlementID] = &v2.GrantSources_GrantSource{IsDirect: isSourceDirect}
updated = true
Expand All @@ -289,21 +303,15 @@ func (e *Expander) runAction(ctx context.Context, action *EntitlementGraphAction
if updated {
sources := v2.GrantSources_builder{Sources: sourcesMap}.Build()
descendantGrant.SetSources(sources)
grantsToUpdate = append(grantsToUpdate, descendantGrant)
newGrants = append(newGrants, descendantGrant)
}
}
newGrants = append(newGrants, grantsToUpdate...)

newGrants, err = PutGrantsInChunks(ctx, e.store, newGrants, 10000)
if err != nil {
l.Error("runAction: error updating descendant grants", zap.Error(err))
return "", fmt.Errorf("runAction: error updating descendant grants: %w", err)
}
}

pageToken = resp.GetNextPageToken()
if pageToken == "" {
break
}
newGrants, err = PutGrantsInChunks(ctx, e.store, newGrants, 10000)
if err != nil {
l.Error("runAction: error updating descendant grants", zap.Error(err))
return "", fmt.Errorf("runAction: error updating descendant grants: %w", err)
}
}

Expand Down
Loading