Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (c *FilterWeigherPipelineController) InitPipeline(
func (c *FilterWeigherPipelineController) SetupWithManager(mgr manager.Manager, mcl *multicluster.Client) error {
c.Initializer = c
c.SchedulingDomain = v1alpha1.SchedulingDomainCinder
c.HistoryManager = lib.HistoryClient{Client: mcl, Recorder: mgr.GetEventRecorder("cortex-cinder-scheduler")}
c.HistoryManager = lib.HistoryClient{Client: mcl, Recorder: mcl.GetEventRecorder("cortex-cinder-scheduler")}
if err := mgr.Add(manager.RunnableFunc(c.InitAllPipelines)); err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (c *FilterWeigherPipelineController) handleMachine() handler.EventHandler {
func (c *FilterWeigherPipelineController) SetupWithManager(mgr manager.Manager, mcl *multicluster.Client) error {
c.Initializer = c
c.SchedulingDomain = v1alpha1.SchedulingDomainMachines
c.HistoryManager = lib.HistoryClient{Client: mcl, Recorder: mgr.GetEventRecorder("cortex-machines-scheduler")}
c.HistoryManager = lib.HistoryClient{Client: mcl, Recorder: mcl.GetEventRecorder("cortex-machines-scheduler")}
if err := mgr.Add(manager.RunnableFunc(c.InitAllPipelines)); err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (c *FilterWeigherPipelineController) InitPipeline(
func (c *FilterWeigherPipelineController) SetupWithManager(mgr manager.Manager, mcl *multicluster.Client) error {
c.Initializer = c
c.SchedulingDomain = v1alpha1.SchedulingDomainManila
c.HistoryManager = lib.HistoryClient{Client: mcl, Recorder: mgr.GetEventRecorder("cortex-manila-scheduler")}
c.HistoryManager = lib.HistoryClient{Client: mcl, Recorder: mcl.GetEventRecorder("cortex-manila-scheduler")}
if err := mgr.Add(manager.RunnableFunc(c.InitAllPipelines)); err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (c *FilterWeigherPipelineController) InitPipeline(
func (c *FilterWeigherPipelineController) SetupWithManager(mgr manager.Manager, mcl *multicluster.Client) error {
c.Initializer = c
c.SchedulingDomain = v1alpha1.SchedulingDomainNova
c.HistoryManager = lib.HistoryClient{Client: mcl, Recorder: mgr.GetEventRecorder("cortex-nova-scheduler")}
c.HistoryManager = lib.HistoryClient{Client: mcl, Recorder: mcl.GetEventRecorder("cortex-nova-scheduler")}
c.gatherer = &candidateGatherer{Client: mcl}
if err := mgr.Add(manager.RunnableFunc(c.InitAllPipelines)); err != nil {
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (c *FilterWeigherPipelineController) handlePod() handler.EventHandler {
func (c *FilterWeigherPipelineController) SetupWithManager(mgr manager.Manager, mcl *multicluster.Client) error {
c.Initializer = c
c.SchedulingDomain = v1alpha1.SchedulingDomainPods
c.HistoryManager = lib.HistoryClient{Client: mcl, Recorder: mgr.GetEventRecorder("cortex-pods-scheduler")}
c.HistoryManager = lib.HistoryClient{Client: mcl, Recorder: mcl.GetEventRecorder("cortex-pods-scheduler")}
if err := mgr.Add(manager.RunnableFunc(c.InitAllPipelines)); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduling/reservations/failover/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,7 @@ func (c *FailoverReservationController) patchReservationStatus(ctx context.Conte
// SetupWithManager sets up the watch-based reconciler with the Manager.
// This handles per-reservation reconciliation triggered by CRD changes.
func (c *FailoverReservationController) SetupWithManager(mgr ctrl.Manager, mcl *multicluster.Client) error {
c.Recorder = mgr.GetEventRecorder("failover-reservation-controller")
c.Recorder = mcl.GetEventRecorder("failover-reservation-controller")

return multicluster.BuildController(mcl, mgr).
For(&v1alpha1.Reservation{}).
Expand Down
18 changes: 16 additions & 2 deletions pkg/multicluster/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/tools/events"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -74,8 +76,9 @@ func (f *fakeCache) getIndexFieldCalls() []indexFieldCall {
// fakeCluster implements cluster.Cluster interface for testing.
type fakeCluster struct {
cluster.Cluster
fakeClient client.Client
fakeCache *fakeCache
fakeClient client.Client
fakeCache *fakeCache
fakeRecorder *fakeEventRecorder
}

func (f *fakeCluster) GetClient() client.Client {
Expand All @@ -86,6 +89,17 @@ func (f *fakeCluster) GetCache() cache.Cache {
return f.fakeCache
}

func (f *fakeCluster) GetEventRecorder(_ string) events.EventRecorder {
if f.fakeRecorder != nil {
return f.fakeRecorder
}
return &fakeEventRecorder{}
}

func (f *fakeCluster) GetEventRecorderFor(_ string) record.EventRecorder {
return record.NewFakeRecorder(100)
}

func newFakeCluster(scheme *runtime.Scheme, objs ...client.Object) *fakeCluster {
return &fakeCluster{
fakeClient: fake.NewClientBuilder().WithScheme(scheme).WithObjects(objs...).Build(),
Expand Down
83 changes: 83 additions & 0 deletions pkg/multicluster/recorder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright SAP SE
// SPDX-License-Identifier: Apache-2.0

package multicluster

import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/events"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cluster"
)

// MultiClusterRecorder implements events.EventRecorder and routes events to the
// correct cluster based on the GVK of the "regarding" object. It uses the same
// routing logic as the multicluster Client's write path.
type MultiClusterRecorder struct {
client *Client
homeRecorder events.EventRecorder
recorders map[cluster.Cluster]events.EventRecorder
}

// GetEventRecorder creates a multi-cluster-aware EventRecorder. It pre-creates
// a per-cluster recorder for the home cluster and every remote cluster currently
// registered in the client. The name parameter is passed through to each
// cluster's GetEventRecorder method (it becomes the reportingController in the
// Kubernetes Event).
func (c *Client) GetEventRecorder(name string) events.EventRecorder {
homeRecorder := c.HomeCluster.GetEventRecorder(name)

recorders := make(map[cluster.Cluster]events.EventRecorder)
recorders[c.HomeCluster] = homeRecorder

c.remoteClustersMu.RLock()
defer c.remoteClustersMu.RUnlock()

for _, remotes := range c.remoteClusters {
for _, r := range remotes {
if _, exists := recorders[r.cluster]; !exists {
recorders[r.cluster] = r.cluster.GetEventRecorder(name)
}
}
}

return &MultiClusterRecorder{
client: c,
homeRecorder: homeRecorder,
recorders: recorders,
}
}

// Eventf routes the event to the cluster that owns the "regarding" object.
// Falls back to the home cluster recorder if routing fails.
func (r *MultiClusterRecorder) Eventf(regarding, related runtime.Object, eventtype, reason, action, note string, args ...any) {
recorder := r.recorderFor(regarding)
recorder.Eventf(regarding, related, eventtype, reason, action, note, args...)
}

// recorderFor resolves which per-cluster recorder to use for the given object.
func (r *MultiClusterRecorder) recorderFor(obj runtime.Object) events.EventRecorder {
if obj == nil {
return r.homeRecorder
}

gvk, err := r.client.GVKFromHomeScheme(obj)
if err != nil {
ctrl.Log.V(1).Info("multi-cluster recorder: failed to resolve GVK, using home recorder", "error", err)
return r.homeRecorder
}

cl, err := r.client.clusterForWrite(gvk, obj)
if err != nil {
ctrl.Log.V(1).Info("multi-cluster recorder: no cluster matched, using home recorder", "gvk", gvk, "error", err)
return r.homeRecorder
}

recorder, ok := r.recorders[cl]
if !ok {
ctrl.Log.V(1).Info("multi-cluster recorder: no pre-created recorder for cluster, using home recorder", "gvk", gvk)
return r.homeRecorder
}

return recorder
}
Loading
Loading