Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions pkg/project/apiserver/registry/project/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,26 @@ func (s *REST) Watch(ctx context.Context, options *metainternal.ListOptions) (wa
return nil, fmt.Errorf("no user")
}

includeAllExistingProjects := (options != nil) && options.ResourceVersion == "0"
// includeAllExistingProjects (RV="0") triggers sending initial state.
// sendBookmark (from SendInitialEvents) triggers sending a bookmark with
// k8s.io/initial-events-end annotation after initial events (if any).
includeAllExistingProjects, sendBookmark := false, false
if options != nil {
if options.ResourceVersion == "0" {
includeAllExistingProjects = true
}
if options.SendInitialEvents != nil && *options.SendInitialEvents {
sendBookmark = true
}
}

allowedNamespaces, err := scope.ScopesToVisibleNamespaces(userInfo.GetExtra()[authorizationapi.ScopesKey], s.authCache.GetClusterRoleLister(), true)
if err != nil {
return nil, err
}

m := projectutil.MatchProject(apihelpers.InternalListOptionsToSelectors(options))
watcher := projectauth.NewUserProjectWatcher(userInfo, allowedNamespaces, s.projectCache, s.authCache, includeAllExistingProjects, m)
watcher := projectauth.NewUserProjectWatcher(userInfo, allowedNamespaces, s.projectCache, s.authCache, includeAllExistingProjects, m, sendBookmark)
s.authCache.AddWatcher(watcher)

go watcher.Watch()
Expand Down
36 changes: 30 additions & 6 deletions pkg/project/auth/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ type userProjectWatcher struct {
initialProjects []corev1.Namespace
// knownProjects maps name to resourceVersion
knownProjects map[string]string

sendBookmark bool
}

var (
Expand All @@ -72,7 +74,7 @@ var (
watchChannelHWM kstorage.HighWaterMark
)

func NewUserProjectWatcher(user user.Info, visibleNamespaces sets.String, projectCache *projectcache.ProjectCache, authCache WatchableCache, includeAllExistingProjects bool, predicate kstorage.SelectionPredicate) *userProjectWatcher {
func NewUserProjectWatcher(user user.Info, visibleNamespaces sets.String, projectCache *projectcache.ProjectCache, authCache WatchableCache, includeAllExistingProjects bool, predicate kstorage.SelectionPredicate, sendBookmark bool) *userProjectWatcher {
namespaces, _ := authCache.List(user, labels.Everything())
knownProjects := map[string]string{}
for _, namespace := range namespaces.Items {
Expand All @@ -98,13 +100,17 @@ func NewUserProjectWatcher(user user.Info, visibleNamespaces sets.String, projec
authCache: authCache,
initialProjects: initialProjects,
knownProjects: knownProjects,

sendBookmark: sendBookmark,
}
w.emit = func(e watch.Event) {
// if dealing with project events, ensure that we only emit events for projects
// that match the field or label selector specified by a consumer
if project, ok := e.Object.(*projectapi.Project); ok {
if matches, err := predicate.Matches(project); err != nil || !matches {
return
if e.Type != watch.Bookmark {
// if dealing with project events, ensure that we only emit events for projects
// that match the field or label selector specified by a consumer
if project, ok := e.Object.(*projectapi.Project); ok {
if matches, err := predicate.Matches(project); err != nil || !matches {
return
}
}
}

Expand Down Expand Up @@ -186,6 +192,13 @@ func (w *userProjectWatcher) GroupMembershipChanged(namespaceName string, users,

// Watch pulls stuff from etcd, converts, and pushes out the outgoing channel. Meant to be
// called as a goroutine.
//
// Design decision: This implementation balances KEP-3157 watch-list support with backward
// compatibility. Initial events are sent only when rv="0" (includeAllExistingProjects=true).
// For other rv values with SendInitialEvents=true, only the bookmark is sent. This approach
// acknowledges that project visibility depends on both namespace objects and RBAC state. Since
// RBAC changes don't update namespace ResourceVersions, permission-filtered views cannot provide
// the same consistency guarantees (resourceVersionMatch=NotOlderThan) as direct object watches.
func (w *userProjectWatcher) Watch() {
defer close(w.outgoing)
defer func() {
Expand Down Expand Up @@ -214,6 +227,17 @@ func (w *userProjectWatcher) Watch() {
})
}

if w.sendBookmark {
w.emit(watch.Event{
Type: watch.Bookmark,
Object: &projectapi.Project{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{metav1.InitialEventsAnnotationKey: "true"},
},
},
})
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

for {
select {
case err := <-w.cacheError:
Expand Down
170 changes: 164 additions & 6 deletions pkg/project/auth/watch_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package auth

import (
"fmt"
"testing"
"time"

Expand All @@ -23,7 +24,7 @@ import (
projectutil "github.com/openshift/openshift-apiserver/pkg/project/util"
)

func newTestWatcher(username string, groups []string, predicate storage.SelectionPredicate, namespaces ...*corev1.Namespace) (*userProjectWatcher, *fakeAuthCache, chan struct{}) {
func newTestWatcher(username string, groups []string, predicate storage.SelectionPredicate, includeAllExistingProjects bool, namespaces ...*corev1.Namespace) (*userProjectWatcher, *fakeAuthCache, chan struct{}) {
objects := []runtime.Object{}
for i := range namespaces {
objects = append(objects, namespaces[i])
Expand All @@ -37,11 +38,14 @@ func newTestWatcher(username string, groups []string, predicate storage.Selectio
"",
)
fakeAuthCache := &fakeAuthCache{}
if includeAllExistingProjects {
fakeAuthCache.namespaces = namespaces
}

stopCh := make(chan struct{})
go projectCache.Run(stopCh)

return NewUserProjectWatcher(&user.DefaultInfo{Name: username, Groups: groups}, sets.NewString("*"), projectCache, fakeAuthCache, false, predicate), fakeAuthCache, stopCh
return NewUserProjectWatcher(&user.DefaultInfo{Name: username, Groups: groups}, sets.NewString("*"), projectCache, fakeAuthCache, includeAllExistingProjects, predicate, false), fakeAuthCache, stopCh
}

type fakeAuthCache struct {
Expand All @@ -66,7 +70,7 @@ func (w *fakeAuthCache) List(userInfo user.Info, selector labels.Selector) (*cor
}

func TestFullIncoming(t *testing.T) {
watcher, fakeAuthCache, stopCh := newTestWatcher("bob", nil, matchAllPredicate(), newNamespaces("ns-01")...)
watcher, fakeAuthCache, stopCh := newTestWatcher("bob", nil, matchAllPredicate(), false, newNamespaces("ns-01")...)
defer close(stopCh)
watcher.cacheIncoming = make(chan watch.Event)

Expand Down Expand Up @@ -115,7 +119,7 @@ func TestFullIncoming(t *testing.T) {
}

func TestAddModifyDeleteEventsByUser(t *testing.T) {
watcher, _, stopCh := newTestWatcher("bob", nil, matchAllPredicate(), newNamespaces("ns-01")...)
watcher, _, stopCh := newTestWatcher("bob", nil, matchAllPredicate(), false, newNamespaces("ns-01")...)
defer close(stopCh)
go watcher.Watch()

Expand Down Expand Up @@ -158,7 +162,7 @@ func TestProjectSelectionPredicate(t *testing.T) {
field := fields.ParseSelectorOrDie("metadata.name=ns-03")
m := projectutil.MatchProject(labels.Everything(), field)

watcher, _, stopCh := newTestWatcher("bob", nil, m, newNamespaces("ns-01", "ns-02", "ns-03")...)
watcher, _, stopCh := newTestWatcher("bob", nil, m, false, newNamespaces("ns-01", "ns-02", "ns-03")...)
defer close(stopCh)

if watcher.emit == nil {
Expand Down Expand Up @@ -220,7 +224,7 @@ func TestProjectSelectionPredicate(t *testing.T) {
}

func TestAddModifyDeleteEventsByGroup(t *testing.T) {
watcher, _, stopCh := newTestWatcher("bob", []string{"group-one"}, matchAllPredicate(), newNamespaces("ns-01")...)
watcher, _, stopCh := newTestWatcher("bob", []string{"group-one"}, matchAllPredicate(), false, newNamespaces("ns-01")...)
defer close(stopCh)
go watcher.Watch()

Expand Down Expand Up @@ -271,3 +275,157 @@ func newNamespaces(names ...string) []*corev1.Namespace {
func matchAllPredicate() storage.SelectionPredicate {
return projectutil.MatchProject(labels.Everything(), fields.Everything())
}

func newNamespacesWithRV(names ...string) []*corev1.Namespace {
ret := []*corev1.Namespace{}
for i, name := range names {
ret = append(ret, &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: name,
ResourceVersion: fmt.Sprintf("%d", i+10),
},
})
}
return ret
}

func newBookmarkTestWatcher(username string, includeAllExistingProjects bool, namespaces ...*corev1.Namespace) (*userProjectWatcher, chan struct{}) {
objects := []runtime.Object{}
for i := range namespaces {
objects = append(objects, namespaces[i])
}
mockClient := fakev1.NewSimpleClientset(objects...)
informers := informersv1.NewSharedInformerFactory(mockClient, controller.NoResyncPeriodFunc())
projectCache := projectcache.NewProjectCache(
informers.Core().V1().Namespaces().Informer(),
mockClient.CoreV1().Namespaces(),
"",
)
fakeAuthCache := &fakeAuthCache{namespaces: namespaces}
stopCh := make(chan struct{})
go projectCache.Run(stopCh)
w := NewUserProjectWatcher(
&user.DefaultInfo{Name: username},
sets.NewString("*"),
projectCache,
fakeAuthCache,
includeAllExistingProjects,
matchAllPredicate(),
true,
)
return w, stopCh
}

func TestSendInitialEventsBookmark(t *testing.T) {
t.Run("with rv=0", func(t *testing.T) {
watcher, stopCh := newBookmarkTestWatcher("bob", true, newNamespacesWithRV("ns-01", "ns-02")...)
defer close(stopCh)

go watcher.Watch()

// expect 2 initial Added events
for i := 0; i < 2; i++ {
select {
case event := <-watcher.ResultChan():
if event.Type != watch.Added {
t.Errorf("expected Added, got %v", event.Type)
}
case <-time.After(3 * time.Second):
t.Fatalf("timeout waiting for initial event %d", i)
}
}

// expect bookmark with annotation
select {
case event := <-watcher.ResultChan():
if event.Type != watch.Bookmark {
t.Errorf("expected Bookmark, got %v", event.Type)
}
project := event.Object.(*projectapi.Project)
if project.Annotations[metav1.InitialEventsAnnotationKey] != "true" {
t.Errorf("expected initial-events-end annotation")
}
case <-time.After(3 * time.Second):
t.Fatalf("timeout waiting for bookmark")
}
})

t.Run("bookmark bypasses field selector predicate", func(t *testing.T) {
// Verify that bookmark events are delivered even when a field selector
// (e.g. metadata.name=<project>) would reject the bookmark's empty Name.
// This is critical for oc delete --wait which watches a specific project
// and needs the bookmark to complete the initial events stream.
field := fields.ParseSelectorOrDie("metadata.name=ns-01")
m := projectutil.MatchProject(labels.Everything(), field)

objects := []runtime.Object{}
namespaces := newNamespacesWithRV("ns-01", "ns-02")
for i := range namespaces {
objects = append(objects, namespaces[i])
}
mockClient := fakev1.NewSimpleClientset(objects...)
informers := informersv1.NewSharedInformerFactory(mockClient, controller.NoResyncPeriodFunc())
projectCache := projectcache.NewProjectCache(
informers.Core().V1().Namespaces().Informer(),
mockClient.CoreV1().Namespaces(),
"",
)
fakeAuthCache := &fakeAuthCache{namespaces: namespaces}
stopCh := make(chan struct{})
defer close(stopCh)
go projectCache.Run(stopCh)

w := NewUserProjectWatcher(
&user.DefaultInfo{Name: "bob"},
sets.NewString("*"),
projectCache,
fakeAuthCache,
false,
m,
true,
)
go w.Watch()

select {
case event := <-w.ResultChan():
if event.Type != watch.Bookmark {
t.Errorf("expected Bookmark, got %v", event.Type)
}
project := event.Object.(*projectapi.Project)
if project.Annotations[metav1.InitialEventsAnnotationKey] != "true" {
t.Errorf("expected initial-events-end annotation")
}
case <-time.After(3 * time.Second):
t.Fatalf("timeout waiting for bookmark — predicate likely filtered it out")
}
})

t.Run("without rv=0", func(t *testing.T) {
watcher, stopCh := newBookmarkTestWatcher("bob", false, newNamespacesWithRV("ns-01", "ns-02")...)
defer close(stopCh)

go watcher.Watch()

// expect bookmark with annotation immediately
select {
case event := <-watcher.ResultChan():
if event.Type != watch.Bookmark {
t.Errorf("expected Bookmark, got %v", event.Type)
}
project := event.Object.(*projectapi.Project)
if project.Annotations[metav1.InitialEventsAnnotationKey] != "true" {
t.Errorf("expected initial-events-end annotation")
}
case <-time.After(3 * time.Second):
t.Fatalf("timeout waiting for bookmark")
}

// verify no additional events
select {
case event := <-watcher.ResultChan():
t.Fatalf("unexpected event after bookmark: %v", event)
case <-time.After(500 * time.Millisecond):
// expected - no more events
}
})
}