Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ KUSTOMIZE_VERSION=v4.5.2
CONTROLLER_GEN_VERSION=v0.16.4
GO_LICENSES_VERSION=v1.6.0
GINKGO_VERSION = $(shell cat go.mod | grep 'github.com/onsi/ginkgo' | sed 's/.*\(v.*\)$$/\1/g')
KIND_VERSION=v0.23.0
KIND_VERSION=v0.30.0
YQ_VERSION=v4.33.3
CONTROLLER_RUNTIME_VERSION = $(shell cat go.mod | grep 'sigs.k8s.io/controller-runtime' | sed 's/.*\(v\(.*\)\.[^.]*\)$$/\2/g')
# ENVTEST_K8S_VERSION refers to the version of kubebuilder assets to be downloaded by envtest binary.
Expand Down
18 changes: 18 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@ rules:
- ""
resources:
- persistentvolumeclaims
verbs:
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- ""
resources:
- pods
verbs:
- delete
Expand Down Expand Up @@ -144,6 +154,14 @@ rules:
- get
- patch
- update
- apiGroups:
- storage.k8s.io
resources:
- storageclasses
verbs:
- get
- list
- watch
- apiGroups:
- zookeeper.pravega.io
resources:
Expand Down
107 changes: 103 additions & 4 deletions controllers/solr_cluster_ops_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,20 @@ import (
"context"
"encoding/json"
"errors"
"net/url"
"strconv"
"time"

solrv1beta1 "github.com/apache/solr-operator/api/v1beta1"
"github.com/apache/solr-operator/controllers/util"
"github.com/apache/solr-operator/controllers/util/solr_api"
"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"
"net/url"
"sigs.k8s.io/controller-runtime/pkg/client"
"strconv"
"time"
)

// SolrClusterOp contains metadata for cluster operations performed on SolrClouds.
Expand All @@ -53,6 +55,7 @@ const (
ScaleUpLock SolrClusterOperationType = "ScalingUp"
UpdateLock SolrClusterOperationType = "RollingUpdate"
BalanceReplicasLock SolrClusterOperationType = "BalanceReplicas"
PvcExpansionLock SolrClusterOperationType = "PVCExpansion"
)

// RollingUpdateMetadata contains metadata for rolling update cluster operations.
Expand Down Expand Up @@ -150,6 +153,101 @@ func retryNextQueuedClusterOpWithQueue(statefulSet *appsv1.StatefulSet, clusterO
return hasOp, err
}

func determinePvcExpansionClusterOpLockIfNecessary(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet, logger logr.Logger) (clusterOp *SolrClusterOp, retryLaterDuration time.Duration, err error) {
if instance.Spec.StorageOptions.PersistentStorage == nil ||
instance.Spec.StorageOptions.PersistentStorage.PersistentVolumeClaimTemplate.Spec.Resources.Requests.Storage() == nil {
return
}
newSize := instance.Spec.StorageOptions.PersistentStorage.PersistentVolumeClaimTemplate.Spec.Resources.Requests.Storage()
// If there is no old size to update, the StatefulSet can just be set to use the new PVC size without any issue.
// Only do a cluster operation if we are expanding from an existing size to a new size.
oldSizeStr, hasOldSize := statefulSet.Annotations[util.StorageMinimumSizeAnnotation]
if !hasOldSize || newSize.String() == oldSizeStr {
return
}
oldSize, e := resource.ParseQuantity(oldSizeStr)
if e != nil {
err = e
logger.Error(err, "Could not parse the existing minimum PVC size from the StatefulSet annotation", "annotation", util.StorageMinimumSizeAnnotation, "value", oldSizeStr)
if r.Recorder != nil {
r.Recorder.Eventf(instance, corev1.EventTypeWarning, "PVCExpansionError",
"Could not parse the existing minimum data PVC size %q recorded on the StatefulSet: %v", oldSizeStr, e)
}
return
}
// PVCs cannot be shrunk, so only proceed if the new size is strictly bigger than the recorded size.
if newSize.Cmp(oldSize) <= 0 {
logger.Info("Cannot shrink existing data PVCs; ignoring the decreased storage request", "currentSize", oldSize.String(), "requestedSize", newSize.String())
if r.Recorder != nil {
r.Recorder.Eventf(instance, corev1.EventTypeWarning, "PVCExpansionForbidden",
"Cannot shrink data PersistentVolumeClaims from %s to %s; PersistentVolumeClaims can only be expanded.", oldSize.String(), newSize.String())
}
return
}
// Pre-flight: make sure the storage class backing the data PVCs allows volume expansion. If it
// explicitly does not, there is no point acquiring a cluster operation lock that can never
// complete; surface it as an event instead.
if allowed, className, scErr := r.storageClassAllowsExpansion(ctx, instance, statefulSet.Spec.Selector.MatchLabels); scErr != nil {
// Could not determine; proceed best-effort and let the PVC patch surface any hard rejection.
logger.Error(scErr, "Could not verify whether the storage class allows volume expansion; proceeding with the expansion attempt")
} else if !allowed {
logger.Info("Storage class does not allow volume expansion; ignoring the increased storage request", "storageClass", className, "currentSize", oldSize.String(), "requestedSize", newSize.String())
if r.Recorder != nil {
r.Recorder.Eventf(instance, corev1.EventTypeWarning, "PVCExpansionForbidden",
"Storage class %q does not allow volume expansion (allowVolumeExpansion); cannot expand data PersistentVolumeClaims from %s to %s.", className, oldSize.String(), newSize.String())
}
return
}
clusterOp = &SolrClusterOp{
Operation: PvcExpansionLock,
Metadata: newSize.String(),
}
return
}

// handlePvcExpansion handles the logic of a persistent volume claim expansion operation.
func handlePvcExpansion(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet, clusterOp *SolrClusterOp, logger logr.Logger) (operationComplete bool, retryLaterDuration time.Duration, err error) {
var newSize resource.Quantity
newSize, err = resource.ParseQuantity(clusterOp.Metadata)
if err != nil {
logger.Error(err, "Could not convert PvcExpansion metadata to a resource.Quantity, as it represents the new size of PVCs", "metadata", clusterOp.Metadata)
return
}
var resizeInfeasible bool
operationComplete, resizeInfeasible, err = r.expandPVCs(ctx, instance, statefulSet.Spec.Selector.MatchLabels, newSize, logger)
if err == nil && operationComplete {
originalStatefulSet := statefulSet.DeepCopy()
statefulSet.Annotations[util.StorageMinimumSizeAnnotation] = newSize.String()
if statefulSet.Spec.Template.Annotations == nil {
statefulSet.Spec.Template.Annotations = make(map[string]string, 1)
}
statefulSet.Spec.Template.Annotations[util.StorageMinimumSizeAnnotation] = newSize.String()
if err = r.Patch(ctx, statefulSet, client.StrategicMergeFrom(originalStatefulSet)); err != nil {
logger.Error(err, "Error while patching StatefulSet to set the new minimum PVC size after PVCs the completion of PVC resizing", "newSize", newSize)
operationComplete = false
} else {
logger.Info("All PersistentVolumeClaims have been expanded, now issuing a rolling restart", "statefulSet", statefulSet.Name)
}
// Return and wait for the StatefulSet to be updated which will call the reconcile to start the rolling restart
retryLaterDuration = 0
} else if err == nil {
if resizeInfeasible {
// The storage backend has declared the requested size infeasible. There is nothing the
// operator can do until the user lowers the requested size, so surface it as an event and
// back off significantly instead of retrying tightly.
if r.Recorder != nil {
r.Recorder.Eventf(instance, corev1.EventTypeWarning, "PVCExpansionInfeasible",
"The storage backend reported that expanding the data PersistentVolumeClaims to %s is infeasible (e.g. it exceeds backend or quota limits). Reduce the requested storage size to a feasible value to recover.",
newSize.String())
}
retryLaterDuration = time.Minute
} else {
retryLaterDuration = time.Second * 5
}
}
return
}

func determineScaleClusterOpLockIfNecessary(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet, scaleDownOpIsQueued bool, podList []corev1.Pod, blockReconciliationOfStatefulSet bool, logger logr.Logger) (clusterOp *SolrClusterOp, retryLaterDuration time.Duration, err error) {
desiredPods := int(*instance.Spec.Replicas)
configuredPods := int(*statefulSet.Spec.Replicas)
Expand Down Expand Up @@ -291,7 +389,8 @@ func cleanupManagedCloudScaleDown(ctx context.Context, r *SolrCloudReconciler, p
// handleManagedCloudScaleUp does the logic of a managed and "locked" cloud scale up operation.
// This will likely take many reconcile loops to complete, as it is moving replicas to the pods that have recently been scaled up.
func handleManagedCloudScaleUp(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet, clusterOp *SolrClusterOp, podList []corev1.Pod, logger logr.Logger) (operationComplete bool, nextClusterOperation *SolrClusterOp, err error) {
desiredPods, err := strconv.Atoi(clusterOp.Metadata)
desiredPods := 0
desiredPods, err = strconv.Atoi(clusterOp.Metadata)
if err != nil {
logger.Error(err, "Could not convert ScaleUp metadata to int, as it represents the number of nodes to scale to", "metadata", clusterOp.Metadata)
return
Expand Down
94 changes: 94 additions & 0 deletions controllers/solr_pvc_expansion_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package controllers

import (
"testing"

corev1 "k8s.io/api/core/v1"
)

// pvcWithCondition builds a PVC carrying a single resize condition.
func pvcWithCondition(condType corev1.PersistentVolumeClaimConditionType, status corev1.ConditionStatus) *corev1.PersistentVolumeClaim {
return &corev1.PersistentVolumeClaim{
Status: corev1.PersistentVolumeClaimStatus{
Conditions: []corev1.PersistentVolumeClaimCondition{{Type: condType, Status: status}},
},
}
}

// pvcWithAllocatedStatus builds a PVC carrying a storage allocatedResourceStatus.
func pvcWithAllocatedStatus(status corev1.ClaimResourceStatus) *corev1.PersistentVolumeClaim {
return &corev1.PersistentVolumeClaim{
Status: corev1.PersistentVolumeClaimStatus{
AllocatedResourceStatuses: map[corev1.ResourceName]corev1.ClaimResourceStatus{
corev1.ResourceStorage: status,
},
},
}
}

// TestPvcControllerExpansionComplete verifies that the controller-side expansion is reported as
// complete for the "offline" provisioner signals (FileSystemResizePending condition or a pending/
// in-progress node resize status), so that the rolling restart is not gated on status.capacity.
func TestPvcControllerExpansionComplete(t *testing.T) {
cases := []struct {
name string
pvc *corev1.PersistentVolumeClaim
want bool
}{
{"empty pvc", &corev1.PersistentVolumeClaim{}, false},
{"filesystem resize pending (offline ready-to-restart)", pvcWithCondition(corev1.PersistentVolumeClaimFileSystemResizePending, corev1.ConditionTrue), true},
{"filesystem resize pending but condition false", pvcWithCondition(corev1.PersistentVolumeClaimFileSystemResizePending, corev1.ConditionFalse), false},
{"unrelated resizing condition", pvcWithCondition(corev1.PersistentVolumeClaimResizing, corev1.ConditionTrue), false},
{"node resize pending status", pvcWithAllocatedStatus(corev1.PersistentVolumeClaimNodeResizePending), true},
{"node resize in progress status", pvcWithAllocatedStatus(corev1.PersistentVolumeClaimNodeResizeInProgress), true},
{"controller resize in progress status", pvcWithAllocatedStatus(corev1.PersistentVolumeClaimControllerResizeInProgress), false},
{"controller resize infeasible status", pvcWithAllocatedStatus(corev1.PersistentVolumeClaimControllerResizeInfeasible), false},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
if got := pvcControllerExpansionComplete(tc.pvc); got != tc.want {
t.Errorf("pvcControllerExpansionComplete() = %v, want %v", got, tc.want)
}
})
}
}

// TestPvcResizeInfeasible verifies that a backend-declared infeasible expansion is detected from the
// allocatedResourceStatuses (best-effort; populated on Kubernetes >= 1.34).
func TestPvcResizeInfeasible(t *testing.T) {
cases := []struct {
name string
pvc *corev1.PersistentVolumeClaim
want bool
}{
{"empty pvc", &corev1.PersistentVolumeClaim{}, false},
{"controller resize infeasible", pvcWithAllocatedStatus(corev1.PersistentVolumeClaimControllerResizeInfeasible), true},
{"node resize infeasible", pvcWithAllocatedStatus(corev1.PersistentVolumeClaimNodeResizeInfeasible), true},
{"node resize pending is not infeasible", pvcWithAllocatedStatus(corev1.PersistentVolumeClaimNodeResizePending), false},
{"controller resize in progress is not infeasible", pvcWithAllocatedStatus(corev1.PersistentVolumeClaimControllerResizeInProgress), false},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
if got := pvcResizeInfeasible(tc.pvc); got != tc.want {
t.Errorf("pvcResizeInfeasible() = %v, want %v", got, tc.want)
}
})
}
}
Loading
Loading