From 9b61376092d93f01749fe6595804306579c8d7d1 Mon Sep 17 00:00:00 2001 From: Konstantin Vedernikov Date: Tue, 2 Jun 2026 23:05:59 +0000 Subject: [PATCH] start --- api/v1alpha1/clickhousecluster_types.go | 13 ++ api/v1alpha1/zz_generated.deepcopy.go | 73 ++++++-- internal/controller/clickhouse/controller.go | 5 +- .../controller/clickhouse/networkpolicy.go | 74 ++++++++ internal/controller/clickhouse/sync.go | 34 ++++ test/e2e/networkpolicy_e2e_test.go | 172 ++++++++++++++++++ 6 files changed, 351 insertions(+), 20 deletions(-) create mode 100644 internal/controller/clickhouse/networkpolicy.go create mode 100644 test/e2e/networkpolicy_e2e_test.go diff --git a/api/v1alpha1/clickhousecluster_types.go b/api/v1alpha1/clickhousecluster_types.go index e40daf25..64f66229 100644 --- a/api/v1alpha1/clickhousecluster_types.go +++ b/api/v1alpha1/clickhousecluster_types.go @@ -14,8 +14,18 @@ import ( "k8s.io/apimachinery/pkg/types" "github.com/ClickHouse/clickhouse-operator/internal/controllerutil" + + networkingv1 "k8s.io/api/networking/v1" ) +type NetworkPolicySpec struct { + Enabled bool `json:"enabled,omitempty"` + // TODO: add Cilium + Backend string `json:"backend,omitempty"` + AllowedClients []networkingv1.NetworkPolicyPeer `json:"allowedClients,omitempty"` + MonitoringPeers []networkingv1.NetworkPolicyPeer `json:"monitoringPeers,omitempty"` +} + // ClickHouseClusterSpec defines the desired state of ClickHouseCluster. type ClickHouseClusterSpec struct { // Number of replicas in the single shard. @@ -63,6 +73,9 @@ type ClickHouseClusterSpec struct { // +optional PodDisruptionBudget *PodDisruptionBudgetSpec `json:"podDisruptionBudget,omitempty"` + // NetworkPolicy configures network policy of cluster. + NetworkPolicy *NetworkPolicySpec `json:"networkPolicy,omitempty"` + // Configuration parameters for ClickHouse server. // +optional Settings ClickHouseSettings `json:"settings,omitempty"` diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 7dc5dc61..d08cccae 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -21,7 +21,8 @@ limitations under the License. package v1alpha1 import ( - "k8s.io/api/core/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/api/networking/v1" policyv1 "k8s.io/api/policy/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -120,7 +121,7 @@ func (in *ClickHouseClusterSpec) DeepCopyInto(out *ClickHouseClusterSpec) { in.ContainerTemplate.DeepCopyInto(&out.ContainerTemplate) if in.DataVolumeClaimSpec != nil { in, out := &in.DataVolumeClaimSpec, &out.DataVolumeClaimSpec - *out = new(v1.PersistentVolumeClaimSpec) + *out = new(corev1.PersistentVolumeClaimSpec) (*in).DeepCopyInto(*out) } if in.Labels != nil { @@ -142,6 +143,11 @@ func (in *ClickHouseClusterSpec) DeepCopyInto(out *ClickHouseClusterSpec) { *out = new(PodDisruptionBudgetSpec) (*in).DeepCopyInto(*out) } + if in.NetworkPolicy != nil { + in, out := &in.NetworkPolicy, &out.NetworkPolicy + *out = new(NetworkPolicySpec) + (*in).DeepCopyInto(*out) + } in.Settings.DeepCopyInto(&out.Settings) if in.VersionProbeTemplate != nil { in, out := &in.VersionProbeTemplate, &out.VersionProbeTemplate @@ -221,7 +227,7 @@ func (in *ClusterTLSSpec) DeepCopyInto(out *ClusterTLSSpec) { *out = *in if in.ServerCertSecret != nil { in, out := &in.ServerCertSecret, &out.ServerCertSecret - *out = new(v1.LocalObjectReference) + *out = new(corev1.LocalObjectReference) **out = **in } if in.CABundle != nil { @@ -278,31 +284,31 @@ func (in *ContainerTemplateSpec) DeepCopyInto(out *ContainerTemplateSpec) { in.Resources.DeepCopyInto(&out.Resources) if in.VolumeMounts != nil { in, out := &in.VolumeMounts, &out.VolumeMounts - *out = make([]v1.VolumeMount, len(*in)) + *out = make([]corev1.VolumeMount, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } } if in.Env != nil { in, out := &in.Env, &out.Env - *out = make([]v1.EnvVar, len(*in)) + *out = make([]corev1.EnvVar, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } } if in.SecurityContext != nil { in, out := &in.SecurityContext, &out.SecurityContext - *out = new(v1.SecurityContext) + *out = new(corev1.SecurityContext) (*in).DeepCopyInto(*out) } if in.LivenessProbe != nil { in, out := &in.LivenessProbe, &out.LivenessProbe - *out = new(v1.Probe) + *out = new(corev1.Probe) (*in).DeepCopyInto(*out) } if in.ReadinessProbe != nil { in, out := &in.ReadinessProbe, &out.ReadinessProbe - *out = new(v1.Probe) + *out = new(corev1.Probe) (*in).DeepCopyInto(*out) } } @@ -443,7 +449,7 @@ func (in *KeeperClusterSpec) DeepCopyInto(out *KeeperClusterSpec) { in.ContainerTemplate.DeepCopyInto(&out.ContainerTemplate) if in.DataVolumeClaimSpec != nil { in, out := &in.DataVolumeClaimSpec, &out.DataVolumeClaimSpec - *out = new(v1.PersistentVolumeClaimSpec) + *out = new(corev1.PersistentVolumeClaimSpec) (*in).DeepCopyInto(*out) } if in.Labels != nil { @@ -543,6 +549,35 @@ func (in *LoggerConfig) DeepCopy() *LoggerConfig { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *NetworkPolicySpec) DeepCopyInto(out *NetworkPolicySpec) { + *out = *in + if in.AllowedClients != nil { + in, out := &in.AllowedClients, &out.AllowedClients + *out = make([]v1.NetworkPolicyPeer, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + if in.MonitoringPeers != nil { + in, out := &in.MonitoringPeers, &out.MonitoringPeers + *out = make([]v1.NetworkPolicyPeer, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NetworkPolicySpec. +func (in *NetworkPolicySpec) DeepCopy() *NetworkPolicySpec { + if in == nil { + return nil + } + out := new(NetworkPolicySpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PodDisruptionBudgetSpec) DeepCopyInto(out *PodDisruptionBudgetSpec) { *out = *in @@ -583,14 +618,14 @@ func (in *PodTemplateSpec) DeepCopyInto(out *PodTemplateSpec) { } if in.TopologySpreadConstraints != nil { in, out := &in.TopologySpreadConstraints, &out.TopologySpreadConstraints - *out = make([]v1.TopologySpreadConstraint, len(*in)) + *out = make([]corev1.TopologySpreadConstraint, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } } if in.ImagePullSecrets != nil { in, out := &in.ImagePullSecrets, &out.ImagePullSecrets - *out = make([]v1.LocalObjectReference, len(*in)) + *out = make([]corev1.LocalObjectReference, len(*in)) copy(*out, *in) } if in.NodeSelector != nil { @@ -602,12 +637,12 @@ func (in *PodTemplateSpec) DeepCopyInto(out *PodTemplateSpec) { } if in.Affinity != nil { in, out := &in.Affinity, &out.Affinity - *out = new(v1.Affinity) + *out = new(corev1.Affinity) (*in).DeepCopyInto(*out) } if in.Tolerations != nil { in, out := &in.Tolerations, &out.Tolerations - *out = make([]v1.Toleration, len(*in)) + *out = make([]corev1.Toleration, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } @@ -624,14 +659,14 @@ func (in *PodTemplateSpec) DeepCopyInto(out *PodTemplateSpec) { } if in.Volumes != nil { in, out := &in.Volumes, &out.Volumes - *out = make([]v1.Volume, len(*in)) + *out = make([]corev1.Volume, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } } if in.SecurityContext != nil { in, out := &in.SecurityContext, &out.SecurityContext - *out = new(v1.PodSecurityContext) + *out = new(corev1.PodSecurityContext) (*in).DeepCopyInto(*out) } if in.TopologyZoneKey != nil { @@ -646,7 +681,7 @@ func (in *PodTemplateSpec) DeepCopyInto(out *PodTemplateSpec) { } if in.InitContainers != nil { in, out := &in.InitContainers, &out.InitContainers - *out = make([]v1.Container, len(*in)) + *out = make([]corev1.Container, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } @@ -713,7 +748,7 @@ func (in *VersionProbeContainer) DeepCopyInto(out *VersionProbeContainer) { in.Resources.DeepCopyInto(&out.Resources) if in.SecurityContext != nil { in, out := &in.SecurityContext, &out.SecurityContext - *out = new(v1.SecurityContext) + *out = new(corev1.SecurityContext) (*in).DeepCopyInto(*out) } } @@ -761,14 +796,14 @@ func (in *VersionProbePodSpec) DeepCopyInto(out *VersionProbePodSpec) { } if in.Tolerations != nil { in, out := &in.Tolerations, &out.Tolerations - *out = make([]v1.Toleration, len(*in)) + *out = make([]corev1.Toleration, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } } if in.SecurityContext != nil { in, out := &in.SecurityContext, &out.SecurityContext - *out = new(v1.PodSecurityContext) + *out = new(corev1.PodSecurityContext) (*in).DeepCopyInto(*out) } if in.Containers != nil { diff --git a/internal/controller/clickhouse/controller.go b/internal/controller/clickhouse/controller.go index aebdd3c8..8c7e1eee 100644 --- a/internal/controller/clickhouse/controller.go +++ b/internal/controller/clickhouse/controller.go @@ -25,6 +25,7 @@ import ( "github.com/ClickHouse/clickhouse-operator/internal/controllerutil" "github.com/ClickHouse/clickhouse-operator/internal/upgrade" webhookv1 "github.com/ClickHouse/clickhouse-operator/internal/webhook/v1alpha1" + networkingv1 "k8s.io/api/networking/v1" ) // ClusterController reconciles a ClickHouseCluster object. @@ -62,6 +63,7 @@ func keeperReferenceFieldValue(cluster *v1.ClickHouseCluster) []string { // +kubebuilder:rbac:groups=policy,resources=poddisruptionbudgets,verbs=get;list;watch;create;update;delete // +kubebuilder:rbac:groups=events.k8s.io,resources=events,verbs=create;patch // +kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;delete +// +kubebuilder:rbac:groups=networking.k8s.io,resources=networkpolicies,verbs=get;list;watch;create;update;delete // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. @@ -189,7 +191,8 @@ func SetupWithManager(mgr ctrl.Manager, log controllerutil.Logger, checker *upgr Owns(&corev1.Secret{}). Owns(&corev1.Service{}). Owns(&corev1.Pod{}). - Owns(&batchv1.Job{}) + Owns(&batchv1.Job{}). + Owns(&networkingv1.NetworkPolicy{}) if enablePDB { controllerBuilder = controllerBuilder.Owns(&policyv1.PodDisruptionBudget{}) diff --git a/internal/controller/clickhouse/networkpolicy.go b/internal/controller/clickhouse/networkpolicy.go new file mode 100644 index 00000000..aee465ef --- /dev/null +++ b/internal/controller/clickhouse/networkpolicy.go @@ -0,0 +1,74 @@ +package clickhouse + +import ( + corev1 "k8s.io/api/core/v1" + networkingv1 "k8s.io/api/networking/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + + v1 "github.com/ClickHouse/clickhouse-operator/api/v1alpha1" + "github.com/ClickHouse/clickhouse-operator/internal/controllerutil" +) + +func templateNetworkPolicy(cr *v1.ClickHouseCluster) *networkingv1.NetworkPolicy { + app := cr.SpecificName() + + tcp := func(port int32) networkingv1.NetworkPolicyPort { + proto := corev1.ProtocolTCP + p := intstr.FromInt32(port) + + return networkingv1.NetworkPolicyPort{Protocol: &proto, Port: &p} + } + + self := networkingv1.NetworkPolicyPeer{ + PodSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{controllerutil.LabelAppKey: app}, + }, + } + + clusterNamespace := networkingv1.NetworkPolicyPeer{ + NamespaceSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"kubernetes.io/metadata.name": cr.Namespace}, + }, + } + + ingress := []networkingv1.NetworkPolicyIngressRule{ + { + From: []networkingv1.NetworkPolicyPeer{self}, + Ports: []networkingv1.NetworkPolicyPort{tcp(PortInterserver), tcp(PortNative)}, + }, + { + From: append([]networkingv1.NetworkPolicyPeer{clusterNamespace}, cr.Spec.NetworkPolicy.AllowedClients...), + Ports: []networkingv1.NetworkPolicyPort{tcp(PortNative), tcp(PortHTTP)}, + }, + } + + if peers := cr.Spec.NetworkPolicy.MonitoringPeers; len(peers) > 0 { + ingress = append(ingress, networkingv1.NetworkPolicyIngressRule{ + From: peers, + Ports: []networkingv1.NetworkPolicyPort{tcp(PortPrometheusScrape)}, + }) + } + + return &networkingv1.NetworkPolicy{ + TypeMeta: metav1.TypeMeta{ + Kind: "NetworkPolicy", + APIVersion: "networking.k8s.io/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: app, + Namespace: cr.Namespace, + Labels: controllerutil.MergeMaps(cr.Spec.Labels, map[string]string{ + controllerutil.LabelAppKey: app, + }), + Annotations: controllerutil.MergeMaps(cr.Spec.Annotations), + }, + Spec: networkingv1.NetworkPolicySpec{ + PodSelector: metav1.LabelSelector{ + MatchLabels: map[string]string{controllerutil.LabelAppKey: app}, + }, + PolicyTypes: []networkingv1.PolicyType{networkingv1.PolicyTypeIngress}, + Ingress: ingress, + }, + } +} diff --git a/internal/controller/clickhouse/sync.go b/internal/controller/clickhouse/sync.go index 010917da..1609254c 100644 --- a/internal/controller/clickhouse/sync.go +++ b/internal/controller/clickhouse/sync.go @@ -12,6 +12,7 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + networkingv1 "k8s.io/api/networking/v1" policyv1 "k8s.io/api/policy/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" @@ -131,6 +132,7 @@ func (r *clickhouseReconciler) sync(ctx context.Context, log ctrlutil.Logger) (c {Name: "ExternalSecret", Fn: r.reconcileExternalSecret, Always: true}, {Name: "ActiveReplicaStatus", Fn: r.reconcileActiveReplicaStatus, Always: true}, {Name: "ClusterRevisions", Fn: r.reconcileClusterRevisions, Always: true}, + {Name: "NetworkPolicy", Fn: r.reconcileNetworkPolicy, Always: true}, {Name: "ReplicaResources", Fn: r.reconcileReplicaResources}, {Name: "DatabaseSync", Fn: r.reconcileDatabaseSync}, {Name: "CleanUp", Fn: r.reconcileCleanUp}, @@ -879,6 +881,38 @@ func (r *clickhouseReconciler) reconcileCleanUp(ctx context.Context, log ctrluti return chctrl.StepContinue(), nil } +func (r *clickhouseReconciler) reconcileNetworkPolicy(ctx context.Context, log ctrlutil.Logger) (chctrl.StepResult, error) { + np := r.Cluster.Spec.NetworkPolicy + + if np != nil && np.Enabled { + desired := templateNetworkPolicy(r.Cluster) + if _, err := r.ReconcileResource(ctx, log, desired, []string{"Spec"}, v1.EventActionReconciling); err != nil { + return chctrl.StepResult{}, fmt.Errorf("reconcile NetworkPolicy: %w", err) + } + + return chctrl.StepContinue(), nil + } + + existing := &networkingv1.NetworkPolicy{} + err := r.GetClient().Get(ctx, types.NamespacedName{ + Namespace: r.Cluster.Namespace, + Name: r.Cluster.SpecificName(), + }, existing) + if k8serrors.IsNotFound(err) { + return chctrl.StepContinue(), nil + } + + if err != nil { + return chctrl.StepResult{}, fmt.Errorf("get NetworkPolicy: %w", err) + } + + if err := r.Delete(ctx, existing, v1.EventActionReconciling); err != nil { + return chctrl.StepResult{}, fmt.Errorf("delete NetworkPolicy: %w", err) + } + + return chctrl.StepContinue(), nil +} + func (r *clickhouseReconciler) evaluateReplicaConditions() { var ( errorIDs, notReadyIDs []string diff --git a/test/e2e/networkpolicy_e2e_test.go b/test/e2e/networkpolicy_e2e_test.go new file mode 100644 index 00000000..8289e65c --- /dev/null +++ b/test/e2e/networkpolicy_e2e_test.go @@ -0,0 +1,172 @@ +package e2e + +import ( + "context" + "fmt" + "math/rand" + "os" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + networkingv1 "k8s.io/api/networking/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + + v1 "github.com/ClickHouse/clickhouse-operator/api/v1alpha1" + "github.com/ClickHouse/clickhouse-operator/internal/controllerutil" +) + +var _ = Describe("ClickHouse NetworkPolicy", Label("clickhouse"), func() { + var ( + ns string + keeper v1.KeeperCluster + ) + + BeforeEach(func(ctx context.Context) { + ns = testNamespace(ctx) + + keeper = v1.KeeperCluster{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: ns, + Name: fmt.Sprintf("np-keeper-%d", rand.Uint32()), //nolint:gosec + }, + Spec: v1.KeeperClusterSpec{ + Replicas: new(int32(1)), + DataVolumeClaimSpec: &defaultStorage, + }, + } + Expect(k8sClient.Create(ctx, &keeper)).To(Succeed()) + DeferCleanup(func(ctx context.Context) { + Expect(k8sClient.Delete(ctx, &keeper)).To(Succeed()) + }) + + WaitKeeperUpdatedAndReady(ctx, &keeper, 2*time.Minute, false) + }) + + It("manages and enforces the cluster NetworkPolicy", func(ctx context.Context) { + cr := v1.ClickHouseCluster{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: ns, + Name: fmt.Sprintf("np-%d", rand.Uint32()), //nolint:gosec + }, + Spec: v1.ClickHouseClusterSpec{ + Replicas: new(int32(1)), + ContainerTemplate: v1.ContainerTemplateSpec{Image: v1.ContainerImage{Tag: BaseVersion}}, + DataVolumeClaimSpec: &defaultStorage, + KeeperClusterRef: v1.KeeperClusterReference{Name: keeper.Name}, + NetworkPolicy: &v1.NetworkPolicySpec{ + Enabled: true, + // Pods labelled role=allowed (in this namespace) may reach the client ports. + AllowedClients: []networkingv1.NetworkPolicyPeer{{ + PodSelector: &metav1.LabelSelector{MatchLabels: map[string]string{"role": "allowed"}}, + }}, + }, + }, + } + + By("creating cluster CR with networkPolicy enabled") + Expect(k8sClient.Create(ctx, &cr)).To(Succeed()) + DeferCleanup(func(ctx context.Context) { Expect(k8sClient.Delete(ctx, &cr)).To(Succeed()) }) + + npKey := types.NamespacedName{Namespace: ns, Name: cr.SpecificName()} + + By("operator creates the NetworkPolicy with the expected shape") + Eventually(func(g Gomega) { + var np networkingv1.NetworkPolicy + g.Expect(k8sClient.Get(ctx, npKey, &np)).To(Succeed()) + g.Expect(np.Spec.PodSelector.MatchLabels).To(HaveKeyWithValue(controllerutil.LabelAppKey, cr.SpecificName())) + g.Expect(np.Spec.PolicyTypes).To(Equal([]networkingv1.PolicyType{networkingv1.PolicyTypeIngress})) + g.Expect(np.Spec.Ingress).To(HaveLen(2)) // intra-cluster + clients + g.Expect(np.OwnerReferences).To(HaveLen(1)) + g.Expect(np.OwnerReferences[0].Name).To(Equal(cr.Name)) + }).WithTimeout(2 * time.Minute).WithPolling(pollingInterval).Should(Succeed()) + + By("waiting until the cluster is ready") + WaitClickHouseUpdatedAndReady(ctx, &cr, 3*time.Minute, false) + + By("resolving the ClickHouse pod IP") + var pods corev1.PodList + Expect(k8sClient.List(ctx, &pods, + client.InNamespace(ns), + client.MatchingLabels{controllerutil.LabelAppKey: cr.SpecificName()}, + )).To(Succeed()) + Expect(pods.Items).NotTo(BeEmpty()) + targetIP := pods.Items[0].Status.PodIP + Expect(targetIP).NotTo(BeEmpty()) + + // Positive check: a client matching AllowedClients reaches the native port. + // This holds with or without an enforcing CNI, so it always runs. + By("a client matching AllowedClients (role=allowed) can reach the native port") + allowed := runConnectivityProbe(ctx, ns, "probe-allowed", targetIP, 9000, map[string]string{"role": "allowed"}) + Eventually(probePhase(ctx, allowed)). + WithTimeout(time.Minute).WithPolling(pollingInterval). + Should(Equal(corev1.PodSucceeded)) + + // Negative checks: ports that must stay closed even for an allowed client. + // The cluster namespace is always permitted on the client ports (operator access), + // so in-namespace probes can only be blocked on ports outside that allowance. + // Blocking only happens under a NetworkPolicy-enforcing CNI (kindnet ignores policies). + if os.Getenv("NP_ENFORCING_CNI") != "" { + cases := []struct { + name string + port int + }{ + {name: "metrics port is closed without monitoringPeers", port: 9363}, + {name: "interserver port is not open to clients", port: 9009}, + } + + for i, tc := range cases { + By("blocked: " + tc.name) + + probe := runConnectivityProbe(ctx, ns, fmt.Sprintf("probe-block-%d", i), targetIP, tc.port, map[string]string{"role": "allowed"}) + Eventually(probePhase(ctx, probe)). + WithTimeout(time.Minute).WithPolling(pollingInterval). + Should(Equal(corev1.PodFailed)) // nc -w3 times out -> non-zero exit -> Failed + } + } + + By("disabling networkPolicy removes the object") + Expect(k8sClient.Get(ctx, cr.NamespacedName(), &cr)).To(Succeed()) + cr.Spec.NetworkPolicy.Enabled = false + Expect(k8sClient.Update(ctx, &cr)).To(Succeed()) + + Eventually(func() bool { + var np networkingv1.NetworkPolicy + return k8serrors.IsNotFound(k8sClient.Get(ctx, npKey, &np)) + }).WithTimeout(time.Minute).WithPolling(pollingInterval).Should(BeTrue()) + }) +}) + +func runConnectivityProbe(ctx context.Context, ns, name, targetIP string, port int, labels map[string]string) *corev1.Pod { + GinkgoHelper() + + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Namespace: ns, Name: name, Labels: labels}, + Spec: corev1.PodSpec{ + RestartPolicy: corev1.RestartPolicyNever, + Containers: []corev1.Container{{ + Name: "probe", + Image: "busybox:1.36", + Command: []string{"sh", "-c", fmt.Sprintf("nc -z -w3 %s %d", targetIP, port)}, + }}, + }, + } + + Expect(k8sClient.Create(ctx, pod)).To(Succeed()) + DeferCleanup(func(ctx context.Context) { _ = k8sClient.Delete(ctx, pod) }) + + return pod +} + +func probePhase(ctx context.Context, pod *corev1.Pod) func(Gomega) corev1.PodPhase { + return func(g Gomega) corev1.PodPhase { + var p corev1.Pod + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(pod), &p)).To(Succeed()) + + return p.Status.Phase + } +}