Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions pkg/cloudprovider/providers/oci/ccm.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,14 @@ func (cp *CloudProvider) Initialize(clientBuilder cloudprovider.ControllerClient
cp.logger,
cp.instanceCache,
cp.client)
flexCIDRController := NewFlexCIDRController(
factory.Core().V1().Nodes(),
factory.Core().V1().Services(),
cp.kubeclient,
cp,
cp.logger,
cp.instanceCache,
cp.client)

nodeInformer := factory.Core().V1().Nodes()
go nodeInformer.Informer().Run(wait.NeverStop)
Expand All @@ -202,6 +210,7 @@ func (cp *CloudProvider) Initialize(clientBuilder cloudprovider.ControllerClient
go serviceAccountInformer.Informer().Run(wait.NeverStop)

go nodeInfoController.Run(wait.NeverStop)
go flexCIDRController.Run(wait.NeverStop)

// If the cluster is type OpenShift then the Tagging Controller
// should be enabled.
Expand Down
203 changes: 203 additions & 0 deletions pkg/cloudprovider/providers/oci/flex_cidr_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
package oci

import (
"context"
"fmt"
"time"

"github.com/oracle/oci-cloud-controller-manager/pkg/flexcidr"
"github.com/oracle/oci-cloud-controller-manager/pkg/oci/client"
"github.com/oracle/oci-go-sdk/v65/core"
"github.com/pkg/errors"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)

const flexCIDRRetryDelay = time.Minute

type FlexCIDRController struct {
nodeInformer coreinformers.NodeInformer
serviceInformer coreinformers.ServiceInformer
kubeClient clientset.Interface
cloud *CloudProvider
queue workqueue.RateLimitingInterface
logger *zap.SugaredLogger
instanceCache cache.Store
ociClient client.Interface
}

func NewFlexCIDRController(
nodeInformer coreinformers.NodeInformer,
serviceInformer coreinformers.ServiceInformer,
kubeClient clientset.Interface,
cloud *CloudProvider,
logger *zap.SugaredLogger,
instanceCache cache.Store,
ociClient client.Interface) *FlexCIDRController {

controller := &FlexCIDRController{
nodeInformer: nodeInformer,
serviceInformer: serviceInformer,
kubeClient: kubeClient,
cloud: cloud,
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
logger: logger,
instanceCache: instanceCache,
ociClient: ociClient,
}

controller.nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
node := obj.(*v1.Node)
controller.queue.Add(node.Name)
},
UpdateFunc: func(_, newObj interface{}) {
node := newObj.(*v1.Node)
controller.queue.Add(node.Name)
},
})

return controller
}

func (fcc *FlexCIDRController) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer fcc.queue.ShutDown()

fcc.logger.Info("Starting flex CIDR controller")

if !cache.WaitForCacheSync(stopCh, fcc.nodeInformer.Informer().HasSynced, fcc.serviceInformer.Informer().HasSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for flex CIDR controller caches to sync"))
return
}

wait.Until(fcc.runWorker, time.Second, stopCh)
}

func (fcc *FlexCIDRController) runWorker() {
for fcc.processNextItem() {
}
}

func (fcc *FlexCIDRController) processNextItem() bool {
key, quit := fcc.queue.Get()
if quit {
return false
}
defer fcc.queue.Done(key)

if err := fcc.processItem(key.(string)); err != nil {
fcc.logger.Errorf("Error processing flex CIDR for node %s (will retry): %v", key, err)
fcc.queue.AddRateLimited(key)
} else {
fcc.queue.Forget(key)
}

return true
}

func (fcc *FlexCIDRController) processItem(key string) error {
logger := fcc.logger.With("node", key)

node, err := fcc.nodeInformer.Lister().Get(key)
if err != nil {
return err
}

if len(node.Spec.PodCIDRs) > 0 && len(node.Spec.ProviderID) == 0 {
logger.Debug("node already has podCIDRs but providerID is empty, skipping")
return nil
}

instance, instanceID, err := fcc.getInstanceByNode(node, logger)
if err != nil {
return err
}
if instance == nil {
return nil
}

if err := fcc.instanceCache.Add(instance); err != nil {
logger.With(zap.Error(err)).Debug("failed to add instance to cache")
}

if instance.LifecycleState != core.InstanceLifecycleStateRunning {
logger.Infof("instance %s not running yet, requeueing", instanceID)
fcc.queue.AddAfter(key, flexCIDRRetryDelay)
return nil
}

config, hasConfig := flexcidr.ParsePrimaryVnicConfig(instance)
if !hasConfig {
logger.Debug("instance metadata does not include flex CIDR configuration, skipping")
return nil
}

clusterIPFamily, err := flexcidr.GetClusterIpFamily(context.Background(), fcc.serviceInformer.Lister())
if err != nil {
logger.With(zap.Error(err)).Info("cluster IP family not ready yet, requeueing")
fcc.queue.AddAfter(key, flexCIDRRetryDelay)
return nil
}

primaryVNIC, err := fcc.ociClient.Compute().GetPrimaryVNICForInstance(context.Background(), *instance.CompartmentId, instanceID)
if err != nil {
return errors.Wrap(err, "GetPrimaryVNICForInstance")
}

flexCIDRManager := &flexcidr.FlexCIDR{
Logger: logger,
PrimaryVnicConfig: config,
ClusterIpFamily: clusterIPFamily,
OciCoreClient: fcc.ociClient.Networking(nil),
}

flexCIDRs, err := flexCIDRManager.GetOrCreateFlexCidrList(*primaryVNIC.Id)
if err != nil {
return err
}
if !flexCIDRManager.ValidateFlexCidrList(flexCIDRs) {
return fmt.Errorf("computed flex CIDRs %v are invalid", flexCIDRs)
}
if flexcidr.StringSlicesEqualIgnoreOrder(node.Spec.PodCIDRs, flexCIDRs) {
logger.Debugf("node already has expected podCIDRs %v", flexCIDRs)
return nil
}

return flexcidr.PatchNodePodCIDRs(context.Background(), fcc.kubeClient, node.Name, flexCIDRs, logger)
}

func (fcc *FlexCIDRController) getInstanceByNode(node *v1.Node, logger *zap.SugaredLogger) (*core.Instance, string, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

providerID := node.Spec.ProviderID
var err error
if providerID == "" {
providerID, err = fcc.cloud.InstanceID(ctx, types.NodeName(node.Name))
if err != nil {
return nil, "", err
}
}

instanceID, err := MapProviderIDToResourceID(providerID)
if err != nil {
logger.With(zap.Error(err)).Error("failed to map providerID to instanceID")
return nil, "", err
}

instance, err := fcc.ociClient.Compute().GetInstance(ctx, instanceID)
if err != nil {
logger.With(zap.Error(err)).Error("failed to fetch instance")
return nil, "", err
}

return instance, instanceID, nil
}
12 changes: 12 additions & 0 deletions pkg/cloudprovider/providers/oci/instances_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1037,6 +1037,10 @@
return nil, nil
}

func (c *MockVirtualNetworkClient) CreatePrivateIpWithRequest(ctx context.Context, request core.CreatePrivateIpRequest) (core.PrivateIp, error) {
return core.PrivateIp{}, nil
}

func (c *MockVirtualNetworkClient) GetIpv6(ctx context.Context, id string) (*core.Ipv6, error) {
return &core.Ipv6{}, nil
}
Expand All @@ -1057,6 +1061,10 @@
return &core.Ipv6{}, nil
}

func (c *MockVirtualNetworkClient) CreateIpv6WithRequest(ctx context.Context, request core.CreateIpv6Request) (core.Ipv6, error) {
return core.Ipv6{}, nil
}

func (c *MockVirtualNetworkClient) GetSubnet(ctx context.Context, id string) (*core.Subnet, error) {
if subnet, ok := subnets[id]; ok {
return subnet, nil
Expand Down Expand Up @@ -1423,6 +1431,10 @@
return nil, nil
}

func (MockIdentityClient) ListAvailabilityDomains(ctx context.Context, compartmentID string) ([]identity.AvailabilityDomain, error) {

Check failure on line 1434 in pkg/cloudprovider/providers/oci/instances_test.go

View workflow job for this annotation

GitHub Actions / Build

method MockIdentityClient.ListAvailabilityDomains already declared at pkg/cloudprovider/providers/oci/instances_test.go:1425:29
return nil, nil
}

type mockInstanceCache struct{}

func (m mockInstanceCache) Add(obj interface{}) error {
Expand Down
16 changes: 16 additions & 0 deletions pkg/csi/driver/bv_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,10 +487,26 @@ func (c *MockVirtualNetworkClient) CreatePrivateIp(ctx context.Context, vnicId s
return &core.PrivateIp{}, nil
}

func (c *MockVirtualNetworkClient) CreatePrivateIpWithRequest(ctx context.Context, request core.CreatePrivateIpRequest) (core.PrivateIp, error) {
return core.PrivateIp{}, nil
}

func (c *MockVirtualNetworkClient) ListPrivateIps(ctx context.Context, id string) ([]core.PrivateIp, error) {
return []core.PrivateIp{}, nil
}

func (c *MockVirtualNetworkClient) ListIpv6s(ctx context.Context, vnicId string) ([]core.Ipv6, error) {
return []core.Ipv6{}, nil
}

func (c *MockVirtualNetworkClient) CreateIpv6(ctx context.Context, vnicID string) (*core.Ipv6, error) {
return &core.Ipv6{}, nil
}

func (c *MockVirtualNetworkClient) CreateIpv6WithRequest(ctx context.Context, request core.CreateIpv6Request) (core.Ipv6, error) {
return core.Ipv6{}, nil
}

func (c *MockVirtualNetworkClient) GetSubnet(ctx context.Context, id string) (*core.Subnet, error) {
if strings.EqualFold(id, "ocid1.invalid-subnet") {
return nil, errors.New("Internal Error.")
Expand Down
Loading
Loading