From f9c4eba45f50028b50589c741183198a21e76475 Mon Sep 17 00:00:00 2001 From: Markus Wieland Date: Tue, 24 Mar 2026 14:09:39 +0100 Subject: [PATCH 1/2] Add multicluster support for the event recorder --- .../filter_weigher_pipeline_controller.go | 2 +- .../filter_weigher_pipeline_controller.go | 2 +- .../filter_weigher_pipeline_controller.go | 2 +- .../filter_weigher_pipeline_controller.go | 2 +- .../filter_weigher_pipeline_controller.go | 2 +- .../reservations/failover/controller.go | 2 +- pkg/multicluster/client_test.go | 18 +- pkg/multicluster/recorder.go | 83 ++++++ pkg/multicluster/recorder_test.go | 256 ++++++++++++++++++ 9 files changed, 361 insertions(+), 8 deletions(-) create mode 100644 pkg/multicluster/recorder.go create mode 100644 pkg/multicluster/recorder_test.go diff --git a/internal/scheduling/cinder/filter_weigher_pipeline_controller.go b/internal/scheduling/cinder/filter_weigher_pipeline_controller.go index 7163cc5a6..52ec37306 100644 --- a/internal/scheduling/cinder/filter_weigher_pipeline_controller.go +++ b/internal/scheduling/cinder/filter_weigher_pipeline_controller.go @@ -148,7 +148,7 @@ func (c *FilterWeigherPipelineController) InitPipeline( func (c *FilterWeigherPipelineController) SetupWithManager(mgr manager.Manager, mcl *multicluster.Client) error { c.Initializer = c c.SchedulingDomain = v1alpha1.SchedulingDomainCinder - c.HistoryManager = lib.HistoryClient{Client: mcl, Recorder: mgr.GetEventRecorder("cortex-cinder-scheduler")} + c.HistoryManager = lib.HistoryClient{Client: mcl, Recorder: mcl.GetEventRecorder("cortex-cinder-scheduler")} if err := mgr.Add(manager.RunnableFunc(c.InitAllPipelines)); err != nil { return err } diff --git a/internal/scheduling/machines/filter_weigher_pipeline_controller.go b/internal/scheduling/machines/filter_weigher_pipeline_controller.go index d76203812..35d51708a 100644 --- a/internal/scheduling/machines/filter_weigher_pipeline_controller.go +++ b/internal/scheduling/machines/filter_weigher_pipeline_controller.go @@ -222,7 +222,7 @@ func (c *FilterWeigherPipelineController) handleMachine() handler.EventHandler { func (c *FilterWeigherPipelineController) SetupWithManager(mgr manager.Manager, mcl *multicluster.Client) error { c.Initializer = c c.SchedulingDomain = v1alpha1.SchedulingDomainMachines - c.HistoryManager = lib.HistoryClient{Client: mcl, Recorder: mgr.GetEventRecorder("cortex-machines-scheduler")} + c.HistoryManager = lib.HistoryClient{Client: mcl, Recorder: mcl.GetEventRecorder("cortex-machines-scheduler")} if err := mgr.Add(manager.RunnableFunc(c.InitAllPipelines)); err != nil { return err } diff --git a/internal/scheduling/manila/filter_weigher_pipeline_controller.go b/internal/scheduling/manila/filter_weigher_pipeline_controller.go index 6ab938511..128b7d719 100644 --- a/internal/scheduling/manila/filter_weigher_pipeline_controller.go +++ b/internal/scheduling/manila/filter_weigher_pipeline_controller.go @@ -148,7 +148,7 @@ func (c *FilterWeigherPipelineController) InitPipeline( func (c *FilterWeigherPipelineController) SetupWithManager(mgr manager.Manager, mcl *multicluster.Client) error { c.Initializer = c c.SchedulingDomain = v1alpha1.SchedulingDomainManila - c.HistoryManager = lib.HistoryClient{Client: mcl, Recorder: mgr.GetEventRecorder("cortex-manila-scheduler")} + c.HistoryManager = lib.HistoryClient{Client: mcl, Recorder: mcl.GetEventRecorder("cortex-manila-scheduler")} if err := mgr.Add(manager.RunnableFunc(c.InitAllPipelines)); err != nil { return err } diff --git a/internal/scheduling/nova/filter_weigher_pipeline_controller.go b/internal/scheduling/nova/filter_weigher_pipeline_controller.go index bb9c5bb07..279ac1c3e 100644 --- a/internal/scheduling/nova/filter_weigher_pipeline_controller.go +++ b/internal/scheduling/nova/filter_weigher_pipeline_controller.go @@ -199,7 +199,7 @@ func (c *FilterWeigherPipelineController) InitPipeline( func (c *FilterWeigherPipelineController) SetupWithManager(mgr manager.Manager, mcl *multicluster.Client) error { c.Initializer = c c.SchedulingDomain = v1alpha1.SchedulingDomainNova - c.HistoryManager = lib.HistoryClient{Client: mcl, Recorder: mgr.GetEventRecorder("cortex-nova-scheduler")} + c.HistoryManager = lib.HistoryClient{Client: mcl, Recorder: mcl.GetEventRecorder("cortex-nova-scheduler")} c.gatherer = &candidateGatherer{Client: mcl} if err := mgr.Add(manager.RunnableFunc(c.InitAllPipelines)); err != nil { return err diff --git a/internal/scheduling/pods/filter_weigher_pipeline_controller.go b/internal/scheduling/pods/filter_weigher_pipeline_controller.go index ceba977b8..0ceee6485 100644 --- a/internal/scheduling/pods/filter_weigher_pipeline_controller.go +++ b/internal/scheduling/pods/filter_weigher_pipeline_controller.go @@ -234,7 +234,7 @@ func (c *FilterWeigherPipelineController) handlePod() handler.EventHandler { func (c *FilterWeigherPipelineController) SetupWithManager(mgr manager.Manager, mcl *multicluster.Client) error { c.Initializer = c c.SchedulingDomain = v1alpha1.SchedulingDomainPods - c.HistoryManager = lib.HistoryClient{Client: mcl, Recorder: mgr.GetEventRecorder("cortex-pods-scheduler")} + c.HistoryManager = lib.HistoryClient{Client: mcl, Recorder: mcl.GetEventRecorder("cortex-pods-scheduler")} if err := mgr.Add(manager.RunnableFunc(c.InitAllPipelines)); err != nil { return err } diff --git a/internal/scheduling/reservations/failover/controller.go b/internal/scheduling/reservations/failover/controller.go index 4cc7d3c1e..ef3d422b7 100644 --- a/internal/scheduling/reservations/failover/controller.go +++ b/internal/scheduling/reservations/failover/controller.go @@ -765,7 +765,7 @@ func (c *FailoverReservationController) patchReservationStatus(ctx context.Conte // SetupWithManager sets up the watch-based reconciler with the Manager. // This handles per-reservation reconciliation triggered by CRD changes. func (c *FailoverReservationController) SetupWithManager(mgr ctrl.Manager, mcl *multicluster.Client) error { - c.Recorder = mgr.GetEventRecorder("failover-reservation-controller") + c.Recorder = mcl.GetEventRecorder("failover-reservation-controller") return multicluster.BuildController(mcl, mgr). For(&v1alpha1.Reservation{}). diff --git a/pkg/multicluster/client_test.go b/pkg/multicluster/client_test.go index 64b0ae94c..290c61e1b 100644 --- a/pkg/multicluster/client_test.go +++ b/pkg/multicluster/client_test.go @@ -13,6 +13,8 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/tools/events" + "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" @@ -74,8 +76,9 @@ func (f *fakeCache) getIndexFieldCalls() []indexFieldCall { // fakeCluster implements cluster.Cluster interface for testing. type fakeCluster struct { cluster.Cluster - fakeClient client.Client - fakeCache *fakeCache + fakeClient client.Client + fakeCache *fakeCache + fakeRecorder *fakeEventRecorder } func (f *fakeCluster) GetClient() client.Client { @@ -86,6 +89,17 @@ func (f *fakeCluster) GetCache() cache.Cache { return f.fakeCache } +func (f *fakeCluster) GetEventRecorder(_ string) events.EventRecorder { + if f.fakeRecorder != nil { + return f.fakeRecorder + } + return &fakeEventRecorder{} +} + +func (f *fakeCluster) GetEventRecorderFor(_ string) record.EventRecorder { + return record.NewFakeRecorder(100) +} + func newFakeCluster(scheme *runtime.Scheme, objs ...client.Object) *fakeCluster { return &fakeCluster{ fakeClient: fake.NewClientBuilder().WithScheme(scheme).WithObjects(objs...).Build(), diff --git a/pkg/multicluster/recorder.go b/pkg/multicluster/recorder.go new file mode 100644 index 000000000..d36cc068d --- /dev/null +++ b/pkg/multicluster/recorder.go @@ -0,0 +1,83 @@ +// Copyright SAP SE +// SPDX-License-Identifier: Apache-2.0 + +package multicluster + +import ( + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/events" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cluster" +) + +// MultiClusterRecorder implements events.EventRecorder and routes events to the +// correct cluster based on the GVK of the "regarding" object. It uses the same +// routing logic as the multicluster Client's write path. +type MultiClusterRecorder struct { + client *Client + homeRecorder events.EventRecorder + recorders map[cluster.Cluster]events.EventRecorder +} + +// GetEventRecorder creates a multi-cluster-aware EventRecorder. It pre-creates +// a per-cluster recorder for the home cluster and every remote cluster currently +// registered in the client. The name parameter is passed through to each +// cluster's GetEventRecorder method (it becomes the reportingController in the +// Kubernetes Event). +func (c *Client) GetEventRecorder(name string) events.EventRecorder { + homeRecorder := c.HomeCluster.GetEventRecorder(name) + + recorders := make(map[cluster.Cluster]events.EventRecorder) + recorders[c.HomeCluster] = homeRecorder + + c.remoteClustersMu.RLock() + defer c.remoteClustersMu.RUnlock() + + for _, remotes := range c.remoteClusters { + for _, r := range remotes { + if _, exists := recorders[r.cluster]; !exists { + recorders[r.cluster] = r.cluster.GetEventRecorder(name) + } + } + } + + return &MultiClusterRecorder{ + client: c, + homeRecorder: homeRecorder, + recorders: recorders, + } +} + +// Eventf routes the event to the cluster that owns the "regarding" object. +// Falls back to the home cluster recorder if routing fails. +func (r *MultiClusterRecorder) Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{}) { + recorder := r.recorderFor(regarding) + recorder.Eventf(regarding, related, eventtype, reason, action, note, args...) +} + +// recorderFor resolves which per-cluster recorder to use for the given object. +func (r *MultiClusterRecorder) recorderFor(obj runtime.Object) events.EventRecorder { + if obj == nil { + return r.homeRecorder + } + + gvk, err := r.client.GVKFromHomeScheme(obj) + if err != nil { + ctrl.Log.V(1).Info("multi-cluster recorder: failed to resolve GVK, using home recorder", "error", err) + return r.homeRecorder + } + + cl, err := r.client.clusterForWrite(gvk, obj) + if err != nil { + ctrl.Log.V(1).Info("multi-cluster recorder: no cluster matched, using home recorder", "gvk", gvk, "error", err) + return r.homeRecorder + } + + recorder, ok := r.recorders[cl] + if !ok { + ctrl.Log.V(1).Info("multi-cluster recorder: no pre-created recorder for cluster, using home recorder", "gvk", gvk) + return r.homeRecorder + } + + return recorder +} diff --git a/pkg/multicluster/recorder_test.go b/pkg/multicluster/recorder_test.go new file mode 100644 index 000000000..6a0888921 --- /dev/null +++ b/pkg/multicluster/recorder_test.go @@ -0,0 +1,256 @@ +// Copyright SAP SE +// SPDX-License-Identifier: Apache-2.0 + +package multicluster + +import ( + "fmt" + "sync" + "testing" + + "github.com/cobaltcore-dev/cortex/api/v1alpha1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +// fakeEventRecorder captures Eventf calls for assertions. +type fakeEventRecorder struct { + mu sync.Mutex + calls []eventfCall +} + +type eventfCall struct { + regarding runtime.Object + eventtype string + reason string + action string + note string +} + +func (f *fakeEventRecorder) Eventf(regarding runtime.Object, _ runtime.Object, eventtype, reason, action, note string, args ...interface{}) { + f.mu.Lock() + defer f.mu.Unlock() + f.calls = append(f.calls, eventfCall{ + regarding: regarding, + eventtype: eventtype, + reason: reason, + action: action, + note: fmt.Sprintf(note, args...), + }) +} + +func (f *fakeEventRecorder) getCalls() []eventfCall { + f.mu.Lock() + defer f.mu.Unlock() + out := make([]eventfCall, len(f.calls)) + copy(out, f.calls) + return out +} + +func TestMultiClusterRecorder_HomeGVK(t *testing.T) { + scheme := newTestScheme(t) + homeRecorder := &fakeEventRecorder{} + homeCluster := &fakeCluster{ + fakeClient: nil, + fakeCache: &fakeCache{}, + fakeRecorder: homeRecorder, + } + + historyGVK := schema.GroupVersionKind{Group: "cortex.cloud", Version: "v1alpha1", Kind: "History"} + mcl := &Client{ + HomeCluster: homeCluster, + HomeScheme: scheme, + homeGVKs: map[schema.GroupVersionKind]bool{historyGVK: true}, + } + + recorder := mcl.GetEventRecorder("test-recorder") + + history := &v1alpha1.History{ + ObjectMeta: metav1.ObjectMeta{Name: "nova-uuid-1"}, + } + recorder.Eventf(history, nil, corev1.EventTypeNormal, "SchedulingSucceeded", "Scheduled", "selected host: %s", "compute-1") + + calls := homeRecorder.getCalls() + if len(calls) != 1 { + t.Fatalf("expected 1 call, got %d", len(calls)) + } + if calls[0].eventtype != corev1.EventTypeNormal { + t.Errorf("expected event type %q, got %q", corev1.EventTypeNormal, calls[0].eventtype) + } + if calls[0].reason != "SchedulingSucceeded" { + t.Errorf("expected reason %q, got %q", "SchedulingSucceeded", calls[0].reason) + } + if calls[0].note != "selected host: compute-1" { + t.Errorf("expected note %q, got %q", "selected host: compute-1", calls[0].note) + } +} + +func TestMultiClusterRecorder_RemoteGVK(t *testing.T) { + scheme := newTestScheme(t) + homeRecorder := &fakeEventRecorder{} + remoteRecorder := &fakeEventRecorder{} + + homeCluster := &fakeCluster{ + fakeClient: nil, + fakeCache: &fakeCache{}, + fakeRecorder: homeRecorder, + } + remote := &fakeCluster{ + fakeClient: nil, + fakeCache: &fakeCache{}, + fakeRecorder: remoteRecorder, + } + + reservationGVK := schema.GroupVersionKind{Group: "cortex.cloud", Version: "v1alpha1", Kind: "Reservation"} + mcl := &Client{ + HomeCluster: homeCluster, + HomeScheme: scheme, + ResourceRouters: map[schema.GroupVersionKind]ResourceRouter{ + reservationGVK: ReservationsResourceRouter{}, + }, + remoteClusters: map[schema.GroupVersionKind][]remoteCluster{ + reservationGVK: {{cluster: remote, labels: map[string]string{"availabilityZone": "az-a"}}}, + }, + } + + recorder := mcl.GetEventRecorder("test-recorder") + + res := &v1alpha1.Reservation{ + ObjectMeta: metav1.ObjectMeta{Name: "res-1"}, + Spec: v1alpha1.ReservationSpec{AvailabilityZone: "az-a"}, + } + recorder.Eventf(res, nil, corev1.EventTypeNormal, "ValidationPassed", "Validated", "reservation validated") + + // Event should go to the remote recorder, not home. + homeCalls := homeRecorder.getCalls() + if len(homeCalls) != 0 { + t.Errorf("expected 0 home calls, got %d", len(homeCalls)) + } + remoteCalls := remoteRecorder.getCalls() + if len(remoteCalls) != 1 { + t.Fatalf("expected 1 remote call, got %d", len(remoteCalls)) + } + if remoteCalls[0].action != "Validated" { + t.Errorf("expected action %q, got %q", "Validated", remoteCalls[0].action) + } +} + +func TestMultiClusterRecorder_MultipleRemotes(t *testing.T) { + scheme := newTestScheme(t) + homeRecorder := &fakeEventRecorder{} + remoteARecorder := &fakeEventRecorder{} + remoteBRecorder := &fakeEventRecorder{} + + homeCluster := &fakeCluster{fakeRecorder: homeRecorder, fakeCache: &fakeCache{}} + remoteA := &fakeCluster{fakeRecorder: remoteARecorder, fakeCache: &fakeCache{}} + remoteB := &fakeCluster{fakeRecorder: remoteBRecorder, fakeCache: &fakeCache{}} + + reservationGVK := schema.GroupVersionKind{Group: "cortex.cloud", Version: "v1alpha1", Kind: "Reservation"} + mcl := &Client{ + HomeCluster: homeCluster, + HomeScheme: scheme, + ResourceRouters: map[schema.GroupVersionKind]ResourceRouter{ + reservationGVK: ReservationsResourceRouter{}, + }, + remoteClusters: map[schema.GroupVersionKind][]remoteCluster{ + reservationGVK: { + {cluster: remoteA, labels: map[string]string{"availabilityZone": "az-a"}}, + {cluster: remoteB, labels: map[string]string{"availabilityZone": "az-b"}}, + }, + }, + } + + recorder := mcl.GetEventRecorder("test-recorder") + + // Event for az-b should go to remoteB. + res := &v1alpha1.Reservation{ + ObjectMeta: metav1.ObjectMeta{Name: "res-b"}, + Spec: v1alpha1.ReservationSpec{AvailabilityZone: "az-b"}, + } + recorder.Eventf(res, nil, corev1.EventTypeWarning, "SchedulingFailed", "FailedScheduling", "no host found") + + if len(remoteARecorder.getCalls()) != 0 { + t.Errorf("expected 0 calls to remote-a, got %d", len(remoteARecorder.getCalls())) + } + if len(remoteBRecorder.getCalls()) != 1 { + t.Fatalf("expected 1 call to remote-b, got %d", len(remoteBRecorder.getCalls())) + } + if remoteBRecorder.getCalls()[0].reason != "SchedulingFailed" { + t.Errorf("expected reason %q, got %q", "SchedulingFailed", remoteBRecorder.getCalls()[0].reason) + } +} + +func TestMultiClusterRecorder_FallbackOnUnknownGVK(t *testing.T) { + scheme := newTestScheme(t) + homeRecorder := &fakeEventRecorder{} + homeCluster := &fakeCluster{fakeRecorder: homeRecorder, fakeCache: &fakeCache{}} + + mcl := &Client{ + HomeCluster: homeCluster, + HomeScheme: scheme, + homeGVKs: map[schema.GroupVersionKind]bool{}, + } + + recorder := mcl.GetEventRecorder("test-recorder") + + // unknownType is not in the scheme — should fall back to home recorder. + obj := &unknownType{ObjectMeta: metav1.ObjectMeta{Name: "unknown-1"}} + recorder.Eventf(obj, nil, corev1.EventTypeNormal, "Test", "Test", "test message") + + if len(homeRecorder.getCalls()) != 1 { + t.Fatalf("expected 1 home call on fallback, got %d", len(homeRecorder.getCalls())) + } +} + +func TestMultiClusterRecorder_FallbackOnNilRegarding(t *testing.T) { + scheme := newTestScheme(t) + homeRecorder := &fakeEventRecorder{} + homeCluster := &fakeCluster{fakeRecorder: homeRecorder, fakeCache: &fakeCache{}} + + mcl := &Client{ + HomeCluster: homeCluster, + HomeScheme: scheme, + } + + recorder := mcl.GetEventRecorder("test-recorder") + recorder.Eventf(nil, nil, corev1.EventTypeNormal, "Test", "Test", "nil regarding") + + if len(homeRecorder.getCalls()) != 1 { + t.Fatalf("expected 1 home call for nil regarding, got %d", len(homeRecorder.getCalls())) + } +} + +func TestMultiClusterRecorder_ConcurrentEventf(t *testing.T) { + scheme := newTestScheme(t) + homeRecorder := &fakeEventRecorder{} + homeCluster := &fakeCluster{fakeRecorder: homeRecorder, fakeCache: &fakeCache{}} + + historyGVK := schema.GroupVersionKind{Group: "cortex.cloud", Version: "v1alpha1", Kind: "History"} + mcl := &Client{ + HomeCluster: homeCluster, + HomeScheme: scheme, + homeGVKs: map[schema.GroupVersionKind]bool{historyGVK: true}, + } + + recorder := mcl.GetEventRecorder("test-recorder") + + const goroutines = 20 + var wg sync.WaitGroup + wg.Add(goroutines) + for i := 0; i < goroutines; i++ { + go func(n int) { + defer wg.Done() + history := &v1alpha1.History{ + ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("history-%d", n)}, + } + recorder.Eventf(history, nil, corev1.EventTypeNormal, "Test", "Test", "event %d", n) + }(i) + } + wg.Wait() + + if len(homeRecorder.getCalls()) != goroutines { + t.Errorf("expected %d calls, got %d", goroutines, len(homeRecorder.getCalls())) + } +} From 9ba897ad0b156af58d29dc8579beaa91f8de780f Mon Sep 17 00:00:00 2001 From: Markus Wieland Date: Tue, 24 Mar 2026 15:26:17 +0100 Subject: [PATCH 2/2] Refactor Eventf method signatures to use 'any' type for variadic arguments --- pkg/multicluster/recorder.go | 2 +- pkg/multicluster/recorder_test.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/multicluster/recorder.go b/pkg/multicluster/recorder.go index d36cc068d..8c7c78d3d 100644 --- a/pkg/multicluster/recorder.go +++ b/pkg/multicluster/recorder.go @@ -50,7 +50,7 @@ func (c *Client) GetEventRecorder(name string) events.EventRecorder { // Eventf routes the event to the cluster that owns the "regarding" object. // Falls back to the home cluster recorder if routing fails. -func (r *MultiClusterRecorder) Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{}) { +func (r *MultiClusterRecorder) Eventf(regarding, related runtime.Object, eventtype, reason, action, note string, args ...any) { recorder := r.recorderFor(regarding) recorder.Eventf(regarding, related, eventtype, reason, action, note, args...) } diff --git a/pkg/multicluster/recorder_test.go b/pkg/multicluster/recorder_test.go index 6a0888921..b23216e2e 100644 --- a/pkg/multicluster/recorder_test.go +++ b/pkg/multicluster/recorder_test.go @@ -29,7 +29,7 @@ type eventfCall struct { note string } -func (f *fakeEventRecorder) Eventf(regarding runtime.Object, _ runtime.Object, eventtype, reason, action, note string, args ...interface{}) { +func (f *fakeEventRecorder) Eventf(regarding, _ runtime.Object, eventtype, reason, action, note string, args ...any) { f.mu.Lock() defer f.mu.Unlock() f.calls = append(f.calls, eventfCall{ @@ -239,7 +239,7 @@ func TestMultiClusterRecorder_ConcurrentEventf(t *testing.T) { const goroutines = 20 var wg sync.WaitGroup wg.Add(goroutines) - for i := 0; i < goroutines; i++ { + for i := range goroutines { go func(n int) { defer wg.Done() history := &v1alpha1.History{