Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ package main

import (
"context"
"maps"
"os"
"os/signal"
"strconv"
"syscall"
"time"

Expand All @@ -31,7 +33,8 @@ import (
)

const (
LeaseLockName = "capoperator-lease-lock"
LeaseLockName = "capoperator-lease-lock"
MaxConcurrentReconcilesEnvPrefix = "MAX_CONCURRENT_RECONCILES_"
)

func main() {
Expand Down Expand Up @@ -82,6 +85,9 @@ func main() {
// Initialize/start metrics server
util.InitMetricsServer()

// Get concurrency config for each resource kind from environment variables or use defaults
concurrencyConfig := getDefaultConcurrencyConfig()

// context for the reconciliation controller
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -133,6 +139,8 @@ func main() {
klog.InfoS("check & update of subscriptionGUID label done")

c := controller.NewController(coreClient, crdClient, istioClient, certClient, certManagerClient, dnsClient, promClient)
// Update the controller's concurrency config before starting the controller
maps.Copy(controller.DefaultConcurrentReconciles, concurrencyConfig)
go c.Start(ctx)
},
OnStoppedLeading: func() {
Expand All @@ -148,3 +156,28 @@ func main() {
},
})
}

func getDefaultConcurrencyConfig() map[int]int {
// inline function to get concurrency config for each resource kind from environment variables or use defaults
getConcurrencyConfigForResource := func(resourceEnvSuffix string, defaultVal int) int {
reconcileEnv := MaxConcurrentReconcilesEnvPrefix + resourceEnvSuffix
if val := os.Getenv(reconcileEnv); val != "" {
if intVal, err := strconv.Atoi(val); err == nil {
return intVal
}
}
return defaultVal
}

// Configure default concurrency for each resource kind, can be overridden by environment variables
concurrencyConfig := make(map[int]int)

for resourceKey, resourceEnvSuffix := range controller.ResourceEnvSuffixMap {
defaultReconcileForResource, ok := controller.DefaultConcurrentReconciles[resourceKey]
if !ok {
defaultReconcileForResource = controller.DefaultReconcile
}
concurrencyConfig[resourceKey] = getConcurrencyConfigForResource(resourceEnvSuffix, defaultReconcileForResource)
}
return concurrencyConfig
}
40 changes: 11 additions & 29 deletions internal/controller/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,6 @@ func initializeControllerForReconciliationTests(t *testing.T, items []ResourceAc
istioClient.PrependReactor("delete-collection", "*", getDeleteCollectionHandler(t, istioClient))
istioClient.PrependReactor("*", "*", getErrorReactorWithResources(items))

coreClient.PrependReactor("create", "*", generateNameCreateHandler)
coreClient.PrependReactor("create", "*", generateNameCreateHandler)
coreClient.PrependReactor("*", "*", getErrorReactorWithResources(items))

Expand All @@ -285,6 +284,7 @@ func initializeControllerForReconciliationTests(t *testing.T, items []ResourceAc

c := NewController(coreClient, copClient, istioClient, gardenerCertClient, certManagerClient, gardenerDNSClient, promopClient)
c.eventRecorder = events.NewFakeRecorder(10)

return c
}

Expand Down Expand Up @@ -509,7 +509,7 @@ func addInitialObjectToStore(resource []byte, c *Controller) error {
}

switch obj.(type) {
case *corev1.Secret, *corev1.Pod, *corev1.Namespace, *corev1.Service:
case *corev1.Secret, *corev1.Pod, *corev1.Namespace, *corev1.Service, *appsv1.Deployment, *batchv1.Job, *networkingv1.NetworkPolicy, *discoveryv1.EndpointSlice:
fakeClient, ok := c.kubeClient.(*k8sfake.Clientset)
if !ok {
return fmt.Errorf("controller is not using a fake clientset")
Expand All @@ -524,21 +524,16 @@ func addInitialObjectToStore(resource []byte, c *Controller) error {
err = c.kubeInformerFactory.Core().V1().Namespaces().Informer().GetIndexer().Add(obj)
case *corev1.Service:
err = c.kubeInformerFactory.Core().V1().Services().Informer().GetIndexer().Add(obj)

case *appsv1.Deployment:
err = c.kubeInformerFactory.Apps().V1().Deployments().Informer().GetIndexer().Add(obj)

case *batchv1.Job:
err = c.kubeInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(obj)

case *networkingv1.NetworkPolicy:
err = c.kubeInformerFactory.Networking().V1().NetworkPolicies().Informer().GetIndexer().Add(obj)
}
case *appsv1.Deployment:
fakeClient, ok := c.kubeClient.(*k8sfake.Clientset)
if !ok {
return fmt.Errorf("controller is not using a fake clientset")
}
fakeClient.Tracker().Add(obj)
err = c.kubeInformerFactory.Apps().V1().Deployments().Informer().GetIndexer().Add(obj)
case *batchv1.Job:
fakeClient, ok := c.kubeClient.(*k8sfake.Clientset)
if !ok {
return fmt.Errorf("controller is not using a fake clientset")
}
fakeClient.Tracker().Add(obj)
err = c.kubeInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(obj)
case *gardenercertv1alpha1.Certificate:
fakeClient, ok := c.gardenerCertificateClient.(*gardenercertfake.Clientset)
if !ok {
Expand All @@ -560,13 +555,6 @@ func addInitialObjectToStore(resource []byte, c *Controller) error {
}
fakeClient.Tracker().Add(obj)
err = c.gardenerDNSInformerFactory.Dns().V1alpha1().DNSEntries().Informer().GetIndexer().Add(obj)
case *networkingv1.NetworkPolicy:
fakeClient, ok := c.kubeClient.(*k8sfake.Clientset)
if !ok {
return fmt.Errorf("controller is not using a fake clientset")
}
fakeClient.Tracker().Add(obj)
err = c.kubeInformerFactory.Networking().V1().NetworkPolicies().Informer().GetIndexer().Add(obj)
case *istionwv1.Gateway, *istionwv1.VirtualService, *istionwv1.DestinationRule:
fakeClient, ok := c.istioClient.(*istiofake.Clientset)
if !ok {
Expand Down Expand Up @@ -613,12 +601,6 @@ func addInitialObjectToStore(resource []byte, c *Controller) error {
return fmt.Errorf("controller is not using a fake clientset")
}
fakeClient.Tracker().Add(obj)
case *discoveryv1.EndpointSlice:
fakeClient, ok := c.kubeClient.(*k8sfake.Clientset)
if !ok {
return fmt.Errorf("controller is not using a fake clientset")
}
fakeClient.Tracker().Add(obj)
default:
return fmt.Errorf("unknown object type")
}
Expand Down
73 changes: 59 additions & 14 deletions internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/sap/cap-operator/pkg/client/clientset/versioned"
v1alpha1scheme "github.com/sap/cap-operator/pkg/client/clientset/versioned/scheme"
crdInformers "github.com/sap/cap-operator/pkg/client/informers/externalversions"
"golang.org/x/time/rate"
istio "istio.io/client-go/pkg/clientset/versioned"
istioscheme "istio.io/client-go/pkg/clientset/versioned/scheme"
istioInformers "istio.io/client-go/pkg/informers/externalversions"
Expand Down Expand Up @@ -56,17 +57,35 @@ type Controller struct {
eventRecorder events.EventRecorder
}

var (
// Application and Domain resources are less frequently updated, so assume a default concurrency of 1.
DefaultReconcile = 1
DefaultConcurrentReconciles = map[int]int{
ResourceCAPApplicationVersion: 3, // Moderate concurrency to handle multiple versions efficiently
ResourceCAPTenant: 10, // High concurrency to handle multiple tenants efficiently
ResourceCAPTenantOperation: 10, // High concurrency to handle multiple tenant operations efficiently
}
ResourceEnvSuffixMap = map[int]string{
ResourceCAPApplication: "CAP_APPLICATION",
ResourceCAPApplicationVersion: "CAP_APPLICATION_VERSION",
ResourceCAPTenant: "CAP_TENANT",
ResourceCAPTenantOperation: "CAP_TENANT_OPERATION",
ResourceDomain: "DOMAIN",
ResourceClusterDomain: "CLUSTER_DOMAIN",
}
)

func NewController(client kubernetes.Interface, crdClient versioned.Interface, istioClient istio.Interface, gardenerCertificateClient gardenerCert.Interface, certManagerCertificateClient certManager.Interface, gardenerDNSClient gardenerDNS.Interface, promClient promop.Interface) *Controller {
// Register metrics provider on the workqueue
initializeMetrics()

queues := map[int]workqueue.TypedRateLimitingInterface[QueueItem]{
ResourceCAPApplication: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[QueueItem](), workqueue.TypedRateLimitingQueueConfig[QueueItem]{Name: KindMap[ResourceCAPApplication]}),
ResourceCAPApplicationVersion: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[QueueItem](), workqueue.TypedRateLimitingQueueConfig[QueueItem]{Name: KindMap[ResourceCAPApplicationVersion]}),
ResourceCAPTenant: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[QueueItem](), workqueue.TypedRateLimitingQueueConfig[QueueItem]{Name: KindMap[ResourceCAPTenant]}),
ResourceCAPTenantOperation: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[QueueItem](), workqueue.TypedRateLimitingQueueConfig[QueueItem]{Name: KindMap[ResourceCAPTenantOperation]}),
ResourceClusterDomain: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[QueueItem](), workqueue.TypedRateLimitingQueueConfig[QueueItem]{Name: KindMap[ResourceClusterDomain]}),
ResourceCAPTenant: workqueue.NewTypedRateLimitingQueueWithConfig(customRateLimiter(), workqueue.TypedRateLimitingQueueConfig[QueueItem]{Name: KindMap[ResourceCAPTenant]}),
ResourceCAPTenantOperation: workqueue.NewTypedRateLimitingQueueWithConfig(customRateLimiter(), workqueue.TypedRateLimitingQueueConfig[QueueItem]{Name: KindMap[ResourceCAPTenantOperation]}),
ResourceDomain: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[QueueItem](), workqueue.TypedRateLimitingQueueConfig[QueueItem]{Name: KindMap[ResourceDomain]}),
ResourceClusterDomain: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[QueueItem](), workqueue.TypedRateLimitingQueueConfig[QueueItem]{Name: KindMap[ResourceClusterDomain]}),
}

// Use 30mins as the default Resync interval for kube / proprietary resources
Expand All @@ -89,8 +108,8 @@ func NewController(client kubernetes.Interface, crdClient versioned.Interface, i
// no activity needed on our side so far
}

// Use 60 as the default Resync interval for our custom resources (CAP CROs)
crdInformerFactory := crdInformers.NewSharedInformerFactory(crdClient, 60*time.Second)
// Use 5 mins as the default Resync interval for our custom resources (CAP CROs)
crdInformerFactory := crdInformers.NewSharedInformerFactory(crdClient, 5*time.Minute)

// initialize event recorder
scheme := runtime.NewScheme()
Expand Down Expand Up @@ -122,6 +141,21 @@ func NewController(client kubernetes.Interface, crdClient versioned.Interface, i
return c
}

// Custom Rate limiter for Tenant and TenantOperation queues to allow faster retries and higher throughput.
func customRateLimiter() workqueue.TypedRateLimiter[QueueItem] {
return workqueue.NewTypedMaxOfRateLimiter(
// Faster exponential backoff for transient errors
workqueue.NewTypedItemExponentialFailureRateLimiter[QueueItem](
10*time.Millisecond, // base delay (was 5ms)
300*time.Second, // max delay (was ~1000s)
),
// Higher QPS for bulk processing
&workqueue.TypedBucketRateLimiter[QueueItem]{
Limiter: rate.NewLimiter(rate.Limit(50), 200), // 50 QPS, 200 burst (was 10/100)
},
)
}

func throwInformerStartError(resources map[reflect.Type]bool) {
for resource, ok := range resources {
if !ok {
Expand Down Expand Up @@ -179,15 +213,18 @@ func (c *Controller) Start(ctx context.Context) {

var wg sync.WaitGroup
for k := range c.queues {
wg.Add(1)
go func(key int) {
defer wg.Done()
err := c.processQueue(qCxt, key)
if err != nil {
klog.ErrorS(err, "worker queue ended with error", "key", key)
}
qCancel() // cancel context to inform other workers
}(k)
concurrency := getConcurrencyForResource(k)
for i := range concurrency {
wg.Add(1)
go func(key, workerId int) {
defer wg.Done()
err := c.processQueue(qCxt, key)
if err != nil {
klog.ErrorS(err, "worker queue ended with error", "key", key)
}
qCancel() // cancel context to inform other workers
}(k, i)
}
}

// start version cleanup routines
Expand All @@ -199,6 +236,14 @@ func (c *Controller) Start(ctx context.Context) {
wg.Wait()
}

func getConcurrencyForResource(key int) int {
concurrency, ok := DefaultConcurrentReconciles[key]
if !ok {
concurrency = DefaultReconcile // default concurrency
}
return concurrency
}

func (c *Controller) processQueue(ctx context.Context, key int) error {
klog.InfoS("starting to process queue", "resource", getResourceKindFromKey(key))
for {
Expand Down
Loading