From 78f8c8922c351dd31dc976c444cbc1bd4b347a39 Mon Sep 17 00:00:00 2001 From: Patrick Zhao Date: Mon, 18 May 2026 16:03:10 +0800 Subject: [PATCH 1/2] disable secret, replicset, event cache in k8s informer Signed-off-by: Patrick Zhao --- pkg/tool/clientmanager/kube.go | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/pkg/tool/clientmanager/kube.go b/pkg/tool/clientmanager/kube.go index 401c8dcd87..70e2d731f6 100644 --- a/pkg/tool/clientmanager/kube.go +++ b/pkg/tool/clientmanager/kube.go @@ -29,6 +29,8 @@ import ( kruise "github.com/openkruise/kruise-api/apps/v1alpha1" "github.com/pkg/errors" istioClient "istio.io/client-go/pkg/clientset/versioned" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" @@ -38,6 +40,7 @@ import ( "k8s.io/client-go/tools/remotecommand" metricsV1Beta1 "k8s.io/metrics/pkg/client/clientset/versioned/typed/metrics/v1beta1" ctrl "sigs.k8s.io/controller-runtime" + controllerRuntimeCache "sigs.k8s.io/controller-runtime/pkg/cache" controllerRuntimeClient "sigs.k8s.io/controller-runtime/pkg/client" controllerRuntimeCluster "sigs.k8s.io/controller-runtime/pkg/cluster" @@ -332,13 +335,16 @@ func (cm *KubeClientManager) GetInformer(clusterID, namespace string) (informers return client.(informers.SharedInformerFactory), nil } - opts := informers.WithNamespace(namespace) + opts := []informers.SharedInformerOption{ + informers.WithNamespace(namespace), + informers.WithTransform(controllerRuntimeCache.TransformStripManagedFields()), + } clientset, err := cm.GetKubernetesClientSet(clusterID) if err != nil { return nil, err } - informerFactory := informers.NewSharedInformerFactoryWithOptions(clientset, time.Minute, opts) + informerFactory := informers.NewSharedInformerFactoryWithOptions(clientset, time.Minute, opts...) // register the resources to be watched informerFactory.Apps().V1().Deployments().Lister() informerFactory.Apps().V1().StatefulSets().Lister() @@ -626,6 +632,14 @@ func createControllerRuntimeCluster(restConfig *rest.Config) (controllerRuntimeC c, err := controllerRuntimeCluster.New(restConfig, func(clusterOptions *controllerRuntimeCluster.Options) { clusterOptions.Scheme = scheme + clusterOptions.Cache.DefaultTransform = controllerRuntimeCache.TransformStripManagedFields() + clusterOptions.Client.Cache = &controllerRuntimeClient.CacheOptions{ + DisableFor: []controllerRuntimeClient.Object{ + &corev1.Secret{}, + &corev1.Event{}, + &appsv1.ReplicaSet{}, + }, + } }) if err != nil { return nil, errors.Wrap(err, "unable to init client") From 9bc0ed92749ca09ee1f6d72c7eda79edaba17e50 Mon Sep 17 00:00:00 2001 From: Patrick Zhao Date: Tue, 19 May 2026 11:18:05 +0800 Subject: [PATCH 2/2] manage informer lifecycle with idle timeout Signed-off-by: Patrick Zhao --- pkg/config/config.go | 4 + .../jobcontroller/job_deploy.go | 4 +- .../jobcontroller/job_freestyle.go | 7 +- .../jobcontroller/job_mse_gray_offline.go | 4 +- .../jobcontroller/job_mse_gray_release.go | 4 +- pkg/setting/consts.go | 1 + pkg/tool/clientmanager/kube.go | 207 ++++++++++++++---- 7 files changed, 189 insertions(+), 42 deletions(-) diff --git a/pkg/config/config.go b/pkg/config/config.go index b410f7f803..e52a933d5b 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -316,6 +316,10 @@ func DisableKubeClientKeepAlive() bool { return viper.GetBool(setting.ENVDisableKubeClientKeepAlive) } +func KubeInformerIdleTimeoutMinutes() int { + return viper.GetInt(setting.ENVKubeInformerIdleTimeout) +} + func IsDocumentDB() bool { return viper.GetBool(setting.ENVIsDocumentDB) } diff --git a/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/job_deploy.go b/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/job_deploy.go index ff159928f7..4b8e835dbd 100644 --- a/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/job_deploy.go +++ b/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/job_deploy.go @@ -158,12 +158,14 @@ func (c *DeployJobCtl) run(ctx context.Context) error { return errors.New(msg) } - c.informer, err = clientmanager.NewKubeClientManager().GetInformer(c.jobTaskSpec.ClusterID, c.namespace) + var releaseInformer func() + c.informer, releaseInformer, err = clientmanager.NewKubeClientManager().AcquireInformer(c.jobTaskSpec.ClusterID, c.namespace) if err != nil { msg := fmt.Sprintf("can't init k8s informer: %v", err) logError(c.job, msg, c.logger) return errors.New(msg) } + defer releaseInformer() c.istioClient, err = clientmanager.NewKubeClientManager().GetIstioClientSet(c.jobTaskSpec.ClusterID) if err != nil { diff --git a/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/job_freestyle.go b/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/job_freestyle.go index 73c5526221..729e209574 100644 --- a/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/job_freestyle.go +++ b/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/job_freestyle.go @@ -73,6 +73,7 @@ type FreestyleJobCtl struct { kubeclient crClient.Client informer informers.SharedInformerFactory apiServer crClient.Reader + releaseInf func() paths *string jobTaskSpec *commonmodels.JobTaskFreestyleSpec ack func() @@ -130,6 +131,9 @@ func (c *FreestyleJobCtl) Run(ctx context.Context) { if err := c.run(ctx); err != nil { return } + if c.releaseInf != nil { + defer c.releaseInf() + } c.wait(ctx) c.complete(ctx) } @@ -299,11 +303,12 @@ func (c *FreestyleJobCtl) run(ctx context.Context) error { } // set informer when job and cm have been created - informer, err := clientmanager.NewKubeClientManager().GetInformer(c.jobTaskSpec.Properties.ClusterID, c.jobTaskSpec.Properties.Namespace) + informer, releaseInformer, err := clientmanager.NewKubeClientManager().AcquireInformer(c.jobTaskSpec.Properties.ClusterID, c.jobTaskSpec.Properties.Namespace) if err != nil { return errors.Wrap(err, "get informer") } c.informer = informer + c.releaseInf = releaseInformer c.logger.Infof("succeed to create job %s", c.job.K8sJobName) return nil } diff --git a/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/job_mse_gray_offline.go b/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/job_mse_gray_offline.go index 3274aa5fb9..7dda803c93 100644 --- a/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/job_mse_gray_offline.go +++ b/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/job_mse_gray_offline.go @@ -97,12 +97,14 @@ func (c *MseGrayOfflineJobCtl) Run(ctx context.Context) { logError(c.job, msg, c.logger) return } - c.informer, err = clientmanager.NewKubeClientManager().GetInformer(clusterID, c.jobTaskSpec.Namespace) + var releaseInformer func() + c.informer, releaseInformer, err = clientmanager.NewKubeClientManager().AcquireInformer(clusterID, c.jobTaskSpec.Namespace) if err != nil { msg := fmt.Sprintf("can't init k8s informer: %v", err) logError(c.job, msg, c.logger) return } + defer releaseInformer() selector := labels.Set{ types.ZadigReleaseTypeLabelKey: types.ZadigReleaseTypeMseGray, types.ZadigReleaseVersionLabelKey: c.jobTaskSpec.GrayTag, diff --git a/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/job_mse_gray_release.go b/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/job_mse_gray_release.go index 1a0596a650..059eb991b3 100644 --- a/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/job_mse_gray_release.go +++ b/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/job_mse_gray_release.go @@ -101,12 +101,14 @@ func (c *MseGrayReleaseJobCtl) Run(ctx context.Context) { return } - c.informer, err = clientmanager.NewKubeClientManager().GetInformer(clusterID, c.namespace) + var releaseInformer func() + c.informer, releaseInformer, err = clientmanager.NewKubeClientManager().AcquireInformer(clusterID, c.namespace) if err != nil { msg := fmt.Sprintf("can't init k8s informer: %v", err) logError(c.job, msg, c.logger) return } + defer releaseInformer() resources := make([]*unstructured.Unstructured, 0) service := c.jobTaskSpec.GrayService diff --git a/pkg/setting/consts.go b/pkg/setting/consts.go index 7c5c104940..0bbcb7d840 100644 --- a/pkg/setting/consts.go +++ b/pkg/setting/consts.go @@ -48,6 +48,7 @@ const ( ENVPodIP = "BE_POD_IP" ENVNamespace = "BE_POD_NAMESPACE" ENVDisableKubeClientKeepAlive = "DISABLE_KUBE_CLIENT_KEEP_ALIVE" + ENVKubeInformerIdleTimeout = "KUBE_INFORMER_IDLE_TIMEOUT_MINUTES" // Aslan ENVLogLevel = "LOG_LEVEL" diff --git a/pkg/tool/clientmanager/kube.go b/pkg/tool/clientmanager/kube.go index 70e2d731f6..fb6fe3f0e6 100644 --- a/pkg/tool/clientmanager/kube.go +++ b/pkg/tool/clientmanager/kube.go @@ -59,6 +59,22 @@ var stopContext = ctrl.SetupSignalHandler() // TODO: Implement a Zadig-Kubernetes client interface, forbid business code to access these clients directly +const ( + defaultInformerIdleTimeout = 60 * time.Minute + informerReaperInterval = 5 * time.Minute + informerSyncTimeout = 10 * time.Second +) + +type informerEntry struct { + clusterID string + namespace string + factory informers.SharedInformerFactory + stopCh chan struct{} + lastUsed time.Time + refCount int + closed bool +} + type KubeClientManager struct { controllerRuntimeClusterMap sync.Map kubernetesClientSetMap sync.Map @@ -66,9 +82,9 @@ type KubeClientManager struct { metricsClientMap sync.Map istioClientSetMap sync.Map - informerStopChanMap sync.Map - informerFactoryMap sync.Map - informerMutex sync.Mutex + informerEntryMap sync.Map + informerMutex sync.Mutex + informerReaper sync.Once generalMutex sync.Mutex } @@ -327,21 +343,88 @@ func (cm *KubeClientManager) GetIstioClientSet(clusterID string) (*istioClient.C func (cm *KubeClientManager) GetInformer(clusterID, namespace string) (informers.SharedInformerFactory, error) { clusterID = handleClusterID(clusterID) - key := generateInformerKey(clusterID, namespace) + cm.startInformerReaper() - client, ok := cm.informerFactoryMap.Load(key) - if ok { - return client.(informers.SharedInformerFactory), nil + cm.informerMutex.Lock() + if entry, ok := cm.loadInformerEntryLocked(key); ok { + entry.lastUsed = time.Now() + factory := entry.factory + cm.informerMutex.Unlock() + return factory, nil } + cm.informerMutex.Unlock() + informerFactory, stopCh, err := cm.createInformerFactory(clusterID, namespace) + if err != nil { + return nil, err + } + + cm.informerMutex.Lock() + defer cm.informerMutex.Unlock() + if entry, ok := cm.loadInformerEntryLocked(key); ok { + close(stopCh) + entry.lastUsed = time.Now() + return entry.factory, nil + } + + cm.informerEntryMap.Store(key, &informerEntry{ + clusterID: clusterID, + namespace: namespace, + factory: informerFactory, + stopCh: stopCh, + lastUsed: time.Now(), + }) + + return informerFactory, nil +} + +func (cm *KubeClientManager) AcquireInformer(clusterID, namespace string) (informers.SharedInformerFactory, func(), error) { + clusterID = handleClusterID(clusterID) + + factory, err := cm.GetInformer(clusterID, namespace) + if err != nil { + return nil, nil, err + } + + key := generateInformerKey(clusterID, namespace) + cm.informerMutex.Lock() + entry, ok := cm.loadInformerEntryLocked(key) + if !ok { + cm.informerMutex.Unlock() + return nil, nil, fmt.Errorf("informer %s is not registered", key) + } + entry.refCount++ + entry.lastUsed = time.Now() + cm.informerMutex.Unlock() + + var releaseOnce sync.Once + release := func() { + releaseOnce.Do(func() { + cm.informerMutex.Lock() + defer cm.informerMutex.Unlock() + entry, ok := cm.loadInformerEntryLocked(key) + if !ok { + return + } + if entry.refCount > 0 { + entry.refCount-- + } + entry.lastUsed = time.Now() + }) + } + + return factory, release, nil +} + +func (cm *KubeClientManager) createInformerFactory(clusterID, namespace string) (informers.SharedInformerFactory, chan struct{}, error) { opts := []informers.SharedInformerOption{ informers.WithNamespace(namespace), informers.WithTransform(controllerRuntimeCache.TransformStripManagedFields()), } clientset, err := cm.GetKubernetesClientSet(clusterID) if err != nil { - return nil, err + return nil, nil, err } informerFactory := informers.NewSharedInformerFactoryWithOptions(clientset, time.Minute, opts...) @@ -354,7 +437,7 @@ func (cm *KubeClientManager) GetInformer(clusterID, namespace string) (informers informerFactory.Batch().V1().Jobs().Lister() versionInfo, err := clientset.Discovery().ServerVersion() if err != nil { - return nil, err + return nil, nil, err } // if less than v1.22.0, then we look for the extensions/v1beta1 ingress @@ -371,41 +454,26 @@ func (cm *KubeClientManager) GetInformer(clusterID, namespace string) (informers informerFactory.Batch().V1().CronJobs().Lister() } - cm.informerMutex.Lock() - defer cm.informerMutex.Unlock() - - stopchan := make(chan struct{}) - informerFactory.Start(stopchan) + stopCh := make(chan struct{}) + informerFactory.Start(stopCh) // wait for the cache to be synced for the first time, with a timeout to // prevent blocking indefinitely when the cluster is unreachable or RBAC // prevents list/watch operations. - syncCtx, syncCancel := context.WithTimeout(context.Background(), 10*time.Second) + syncCtx, syncCancel := context.WithTimeout(context.Background(), informerSyncTimeout) defer syncCancel() informerFactory.WaitForCacheSync(syncCtx.Done()) - oldStopChan, ok := cm.informerStopChanMap.Load(key) - if ok { - close(oldStopChan.(chan struct{})) - cm.informerStopChanMap.Delete(key) - } - - cm.informerStopChanMap.Store(key, stopchan) - cm.informerFactoryMap.Store(key, informerFactory) - - return informerFactory, nil + return informerFactory, stopCh, nil } func (cm *KubeClientManager) DeleteInformer(clusterID, namespace string) { + clusterID = handleClusterID(clusterID) cm.informerMutex.Lock() defer cm.informerMutex.Unlock() - oldStopChan, ok := cm.informerStopChanMap.Load(generateInformerKey(clusterID, namespace)) - if ok { - close(oldStopChan.(chan struct{})) - cm.informerStopChanMap.Delete(generateInformerKey(clusterID, namespace)) - } - - cm.informerFactoryMap.Delete(generateInformerKey(clusterID, namespace)) + cm.deleteInformerEntriesLocked(func(entry *informerEntry) bool { + return entry.clusterID == clusterID && entry.namespace == namespace + }) } // GetSPDYExecutor does not return singleton since this kind of client is not commonly reused. @@ -490,6 +558,8 @@ func (cm *KubeClientManager) GetRestConfig(clusterID string) (*rest.Config, erro } func (cm *KubeClientManager) Clear(clusterID string) error { + clusterID = handleClusterID(clusterID) + cm.generalMutex.Lock() defer cm.generalMutex.Unlock() @@ -508,12 +578,11 @@ func (cm *KubeClientManager) Clear(clusterID string) error { } for _, env := range envs { - stopchan, ok := cm.informerStopChanMap.Load(generateInformerKey(clusterID, env.Namespace)) - if ok { - close(stopchan.(chan struct{})) - } - cm.informerStopChanMap.Delete(generateInformerKey(clusterID, env.Namespace)) - cm.informerFactoryMap.Delete(generateInformerKey(clusterID, env.Namespace)) + cm.informerMutex.Lock() + cm.deleteInformerEntriesLocked(func(entry *informerEntry) bool { + return entry.clusterID == clusterID && entry.namespace == env.Namespace + }) + cm.informerMutex.Unlock() } return nil @@ -652,6 +721,68 @@ func generateInformerKey(clusterID, namespace string) string { return fmt.Sprintf(setting.InformerNamingConvention, clusterID, namespace) } +func (cm *KubeClientManager) loadInformerEntryLocked(key string) (*informerEntry, bool) { + value, ok := cm.informerEntryMap.Load(key) + if !ok { + return nil, false + } + entry := value.(*informerEntry) + if entry.closed { + cm.informerEntryMap.Delete(key) + return nil, false + } + return entry, true +} + +func (cm *KubeClientManager) startInformerReaper() { + cm.informerReaper.Do(func() { + go func() { + ticker := time.NewTicker(informerReaperInterval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + cm.reapIdleInformers() + case <-stopContext.Done(): + return + } + } + }() + }) +} + +func (cm *KubeClientManager) reapIdleInformers() { + idleTimeout := informerIdleTimeout() + now := time.Now() + + cm.informerMutex.Lock() + defer cm.informerMutex.Unlock() + + cm.deleteInformerEntriesLocked(func(entry *informerEntry) bool { + return entry.refCount == 0 && now.Sub(entry.lastUsed) > idleTimeout + }) +} + +func (cm *KubeClientManager) deleteInformerEntriesLocked(shouldDelete func(*informerEntry) bool) { + cm.informerEntryMap.Range(func(key, value interface{}) bool { + entry := value.(*informerEntry) + if !entry.closed && shouldDelete(entry) { + close(entry.stopCh) + entry.closed = true + cm.informerEntryMap.Delete(key) + } + return true + }) +} + +func informerIdleTimeout() time.Duration { + minutes := config.KubeInformerIdleTimeoutMinutes() + if minutes <= 0 { + return defaultInformerIdleTimeout + } + return time.Duration(minutes) * time.Minute +} + // disableKeepAlive configures REST config to not keep connections alive func disableKeepAlive(cfg *rest.Config) { if !config.DisableKubeClientKeepAlive() {