Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions api/v1alpha1/clickhousecluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,18 @@ import (
"k8s.io/apimachinery/pkg/types"

"github.com/ClickHouse/clickhouse-operator/internal/controllerutil"

networkingv1 "k8s.io/api/networking/v1"
)

type NetworkPolicySpec struct {
Enabled bool `json:"enabled,omitempty"`
// TODO: add Cilium
Backend string `json:"backend,omitempty"`
AllowedClients []networkingv1.NetworkPolicyPeer `json:"allowedClients,omitempty"`
MonitoringPeers []networkingv1.NetworkPolicyPeer `json:"monitoringPeers,omitempty"`
}

// ClickHouseClusterSpec defines the desired state of ClickHouseCluster.
type ClickHouseClusterSpec struct {
// Number of replicas in the single shard.
Expand Down Expand Up @@ -63,6 +73,9 @@ type ClickHouseClusterSpec struct {
// +optional
PodDisruptionBudget *PodDisruptionBudgetSpec `json:"podDisruptionBudget,omitempty"`

// NetworkPolicy configures network policy of cluster.
NetworkPolicy *NetworkPolicySpec `json:"networkPolicy,omitempty"`

// Configuration parameters for ClickHouse server.
// +optional
Settings ClickHouseSettings `json:"settings,omitempty"`
Expand Down
73 changes: 54 additions & 19 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion internal/controller/clickhouse/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/ClickHouse/clickhouse-operator/internal/controllerutil"
"github.com/ClickHouse/clickhouse-operator/internal/upgrade"
webhookv1 "github.com/ClickHouse/clickhouse-operator/internal/webhook/v1alpha1"
networkingv1 "k8s.io/api/networking/v1"
)

// ClusterController reconciles a ClickHouseCluster object.
Expand Down Expand Up @@ -62,6 +63,7 @@ func keeperReferenceFieldValue(cluster *v1.ClickHouseCluster) []string {
// +kubebuilder:rbac:groups=policy,resources=poddisruptionbudgets,verbs=get;list;watch;create;update;delete
// +kubebuilder:rbac:groups=events.k8s.io,resources=events,verbs=create;patch
// +kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;delete
// +kubebuilder:rbac:groups=networking.k8s.io,resources=networkpolicies,verbs=get;list;watch;create;update;delete

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
Expand Down Expand Up @@ -189,7 +191,8 @@ func SetupWithManager(mgr ctrl.Manager, log controllerutil.Logger, checker *upgr
Owns(&corev1.Secret{}).
Owns(&corev1.Service{}).
Owns(&corev1.Pod{}).
Owns(&batchv1.Job{})
Owns(&batchv1.Job{}).
Owns(&networkingv1.NetworkPolicy{})

if enablePDB {
controllerBuilder = controllerBuilder.Owns(&policyv1.PodDisruptionBudget{})
Expand Down
74 changes: 74 additions & 0 deletions internal/controller/clickhouse/networkpolicy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package clickhouse

import (
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"

v1 "github.com/ClickHouse/clickhouse-operator/api/v1alpha1"
"github.com/ClickHouse/clickhouse-operator/internal/controllerutil"
)

func templateNetworkPolicy(cr *v1.ClickHouseCluster) *networkingv1.NetworkPolicy {
app := cr.SpecificName()

tcp := func(port int32) networkingv1.NetworkPolicyPort {
proto := corev1.ProtocolTCP
p := intstr.FromInt32(port)

return networkingv1.NetworkPolicyPort{Protocol: &proto, Port: &p}
}

self := networkingv1.NetworkPolicyPeer{
PodSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{controllerutil.LabelAppKey: app},
},
}

clusterNamespace := networkingv1.NetworkPolicyPeer{
NamespaceSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{"kubernetes.io/metadata.name": cr.Namespace},
},
}

ingress := []networkingv1.NetworkPolicyIngressRule{
{
From: []networkingv1.NetworkPolicyPeer{self},
Ports: []networkingv1.NetworkPolicyPort{tcp(PortInterserver), tcp(PortNative)},
},
{
From: append([]networkingv1.NetworkPolicyPeer{clusterNamespace}, cr.Spec.NetworkPolicy.AllowedClients...),
Ports: []networkingv1.NetworkPolicyPort{tcp(PortNative), tcp(PortHTTP)},
},
}

if peers := cr.Spec.NetworkPolicy.MonitoringPeers; len(peers) > 0 {
ingress = append(ingress, networkingv1.NetworkPolicyIngressRule{
From: peers,
Ports: []networkingv1.NetworkPolicyPort{tcp(PortPrometheusScrape)},
})
}

return &networkingv1.NetworkPolicy{
TypeMeta: metav1.TypeMeta{
Kind: "NetworkPolicy",
APIVersion: "networking.k8s.io/v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: app,
Namespace: cr.Namespace,
Labels: controllerutil.MergeMaps(cr.Spec.Labels, map[string]string{
controllerutil.LabelAppKey: app,
}),
Annotations: controllerutil.MergeMaps(cr.Spec.Annotations),
},
Spec: networkingv1.NetworkPolicySpec{
PodSelector: metav1.LabelSelector{
MatchLabels: map[string]string{controllerutil.LabelAppKey: app},
},
PolicyTypes: []networkingv1.PolicyType{networkingv1.PolicyTypeIngress},
Ingress: ingress,
},
}
}
34 changes: 34 additions & 0 deletions internal/controller/clickhouse/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
policyv1 "k8s.io/api/policy/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
Expand Down Expand Up @@ -131,6 +132,7 @@ func (r *clickhouseReconciler) sync(ctx context.Context, log ctrlutil.Logger) (c
{Name: "ExternalSecret", Fn: r.reconcileExternalSecret, Always: true},
{Name: "ActiveReplicaStatus", Fn: r.reconcileActiveReplicaStatus, Always: true},
{Name: "ClusterRevisions", Fn: r.reconcileClusterRevisions, Always: true},
{Name: "NetworkPolicy", Fn: r.reconcileNetworkPolicy, Always: true},
{Name: "ReplicaResources", Fn: r.reconcileReplicaResources},
{Name: "DatabaseSync", Fn: r.reconcileDatabaseSync},
{Name: "CleanUp", Fn: r.reconcileCleanUp},
Expand Down Expand Up @@ -879,6 +881,38 @@ func (r *clickhouseReconciler) reconcileCleanUp(ctx context.Context, log ctrluti
return chctrl.StepContinue(), nil
}

func (r *clickhouseReconciler) reconcileNetworkPolicy(ctx context.Context, log ctrlutil.Logger) (chctrl.StepResult, error) {
np := r.Cluster.Spec.NetworkPolicy

if np != nil && np.Enabled {
desired := templateNetworkPolicy(r.Cluster)
if _, err := r.ReconcileResource(ctx, log, desired, []string{"Spec"}, v1.EventActionReconciling); err != nil {
return chctrl.StepResult{}, fmt.Errorf("reconcile NetworkPolicy: %w", err)
}

return chctrl.StepContinue(), nil
}

existing := &networkingv1.NetworkPolicy{}
err := r.GetClient().Get(ctx, types.NamespacedName{
Namespace: r.Cluster.Namespace,
Name: r.Cluster.SpecificName(),
}, existing)
if k8serrors.IsNotFound(err) {
return chctrl.StepContinue(), nil
}

if err != nil {
return chctrl.StepResult{}, fmt.Errorf("get NetworkPolicy: %w", err)
}

if err := r.Delete(ctx, existing, v1.EventActionReconciling); err != nil {
return chctrl.StepResult{}, fmt.Errorf("delete NetworkPolicy: %w", err)
}

return chctrl.StepContinue(), nil
}

func (r *clickhouseReconciler) evaluateReplicaConditions() {
var (
errorIDs, notReadyIDs []string
Expand Down
Loading
Loading