From 3471c53b80c8411e0c5f76810b3b5f8fc95c72b6 Mon Sep 17 00:00:00 2001 From: Konstantin Vedernikov Date: Mon, 1 Jun 2026 13:51:09 +0000 Subject: [PATCH 01/12] start --- api/v1alpha1/events.go | 5 ++ internal/controller/clickhouse/commands.go | 24 ++++++++ internal/controller/clickhouse/sync.go | 56 ++++++++++++++++- internal/controller/constants.go | 1 + test/e2e/clickhouse_e2e_test.go | 72 ++++++++++++++++++++++ 5 files changed, 157 insertions(+), 1 deletion(-) diff --git a/api/v1alpha1/events.go b/api/v1alpha1/events.go index 83689fa7..c4ec1b66 100644 --- a/api/v1alpha1/events.go +++ b/api/v1alpha1/events.go @@ -38,6 +38,11 @@ const ( EventReasonUpgradeAvailable EventReason = "VersionUpgradeAvailable" ) +// ClickHouse warning event +const ( + EventWarningRecord EventReason = "ClickHouseWarning" +) + // EventAction represents the action associated with an event. type EventAction = string diff --git a/internal/controller/clickhouse/commands.go b/internal/controller/clickhouse/commands.go index b2838520..2f58562c 100644 --- a/internal/controller/clickhouse/commands.go +++ b/internal/controller/clickhouse/commands.go @@ -119,6 +119,30 @@ func (cmd *commander) Probe(ctx context.Context, id v1.ClickHouseReplicaID) (rep return probe, nil } +// Reads system warnings from the server +func (cmd *commander) Warnings(ctx context.Context, id v1.ClickHouseReplicaID) ([]string, error) { + conn, err := cmd.getConn(id) + if err != nil { + return []string{}, fmt.Errorf("failed to get connection for replica %s: %w", id, err) + } + + warnings := []string{} + + rows, err := conn.Query(ctx, + "SELECT message FROM system.warnings", + ) + + for rows.Next() { + var raw string + if err := rows.Scan(&raw); err != nil { + return []string{}, fmt.Errorf("probe replica %s: %w", id, err) + } + warnings = append(warnings, raw) + } + + return warnings, nil +} + // ReloadConfig queries the replica to reload its configuration. func (cmd *commander) ReloadConfig(ctx context.Context, id v1.ClickHouseReplicaID) error { conn, err := cmd.getConn(id) diff --git a/internal/controller/clickhouse/sync.go b/internal/controller/clickhouse/sync.go index 010917da..5b16092e 100644 --- a/internal/controller/clickhouse/sync.go +++ b/internal/controller/clickhouse/sync.go @@ -4,6 +4,7 @@ import ( "cmp" "context" "fmt" + "hash/fnv" "maps" "slices" "strconv" @@ -130,6 +131,7 @@ func (r *clickhouseReconciler) sync(ctx context.Context, log ctrlutil.Logger) (c {Name: "ClusterSecret", Fn: r.reconcileClusterSecret, Always: true}, {Name: "ExternalSecret", Fn: r.reconcileExternalSecret, Always: true}, {Name: "ActiveReplicaStatus", Fn: r.reconcileActiveReplicaStatus, Always: true}, + {Name: "Warnings", Fn: r.reconcileWarnings, Always: true}, {Name: "ClusterRevisions", Fn: r.reconcileClusterRevisions, Always: true}, {Name: "ReplicaResources", Fn: r.reconcileReplicaResources}, {Name: "DatabaseSync", Fn: r.reconcileDatabaseSync}, @@ -508,10 +510,62 @@ func (r *clickhouseReconciler) reconcileActiveReplicaStatus(ctx context.Context, } r.evaluateReplicaConditions() - return chctrl.StepContinue(), nil } +func warningAction(msg string) string { + h := fnv.New32a() + _, _ = h.Write([]byte(msg)) + return fmt.Sprintf("Warning-%08x", h.Sum32()) +} + +func (r *clickhouseReconciler) reconcileWarnings(ctx context.Context, log ctrlutil.Logger) (chctrl.StepResult, error) { + listOpts := ctrlutil.AppRequirements(r.Cluster.Namespace, r.Cluster.SpecificName()) + + var statefulSets appsv1.StatefulSetList + if err := r.GetClient().List(ctx, &statefulSets, listOpts); err != nil { + return chctrl.StepResult{}, fmt.Errorf("list StatefulSets: %w", err) + } + + ctrlutil.ExecuteParallel(statefulSets.Items, func(sts appsv1.StatefulSet) (v1.ClickHouseReplicaID, []string, error) { + id, err := v1.ClickHouseIDFromLabels(sts.Labels) + if err != nil { + log.Error(err, "get replica ID from StatefulSet labels", "statefulset", sts.Name) + return v1.ClickHouseReplicaID{}, []string{}, fmt.Errorf("get replica ID from StatefulSet labels: %w", err) + } + + hasError, err := chctrl.CheckPodError(ctx, log, r.GetClient(), &sts) + if err != nil { + log.Warn("failed to check replica pod error", "statefulset", sts.Name, "error", err) + + hasError = true + } + + var warnings []string + + if !hasError && sts.Status.ReadyReplicas > 0 && r.commander != nil { + ctx, cancel := context.WithTimeout(ctx, chctrl.LoadReplicaStateTimeout) + defer cancel() + + warnings, err := r.commander.Warnings(ctx, id) + if err != nil { + log.Debug("failed to get warnings from replica", id, " error", err) + } + + log.Info("system.warnings fetched", "replica_id", id, "count", len(warnings)) + + for _, warning := range warnings { + r.GetRecorder().Eventf(r.Cluster, nil, corev1.EventTypeWarning, + v1.EventWarningRecord, warningAction(warning), + "Replica %s: %s", r.Cluster.HostnameByID(id), warning) + } + } + return id, warnings, nil + }) + + return chctrl.StepRequeue(chctrl.WarningsPollInterval), nil +} + func (r *clickhouseReconciler) reconcileClusterRevisions(ctx context.Context, log ctrlutil.Logger) (chctrl.StepResult, error) { if r.Cluster.Status.ObservedGeneration != r.Cluster.Generation { r.Cluster.Status.ObservedGeneration = r.Cluster.Generation diff --git a/internal/controller/constants.go b/internal/controller/constants.go index 9dd04c33..91147dbd 100644 --- a/internal/controller/constants.go +++ b/internal/controller/constants.go @@ -9,6 +9,7 @@ import ( const ( RequeueOnRefreshTimeout = time.Second LoadReplicaStateTimeout = 10 * time.Second + WarningsPollInterval = 30 * time.Second TLSFileMode int32 = 0444 DefaultUser int64 = 101 ) diff --git a/test/e2e/clickhouse_e2e_test.go b/test/e2e/clickhouse_e2e_test.go index 0e10b3c0..c7163928 100644 --- a/test/e2e/clickhouse_e2e_test.go +++ b/test/e2e/clickhouse_e2e_test.go @@ -172,6 +172,78 @@ var _ = Describe("ClickHouse controller", Label("clickhouse"), func() { Entry("scale down to 2 replicas", 3, v1.ClickHouseClusterSpec{Replicas: new(int32(2))}), ) + It("should surface system.warnings as ClickHouseWarning events", func(ctx context.Context) { + cr := v1.ClickHouseCluster{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: ns, + Name: fmt.Sprintf("warnings-%d", rand.Uint32()), //nolint:gosec + }, + Spec: v1.ClickHouseClusterSpec{ + Replicas: new(int32(1)), + ContainerTemplate: v1.ContainerTemplateSpec{ + Image: v1.ContainerImage{Tag: BaseVersion}, + }, + DataVolumeClaimSpec: &defaultStorage, + KeeperClusterRef: v1.KeeperClusterReference{Name: keeper.Name}, + Settings: v1.ClickHouseSettings{ + ExtraConfig: runtime.RawExtension{Raw: []byte(`{"max_table_num_to_warn": 1}`)}, + }, + }, + } + + By("creating cluster CR") + Expect(k8sClient.Create(ctx, &cr)).To(Succeed()) + DeferCleanup(func(ctx context.Context) { + By("deleting cluster CR") + Expect(k8sClient.Delete(ctx, &cr)).To(Succeed()) + }) + WaitClickHouseUpdatedAndReady(ctx, &cr, 2*time.Minute, false) + + By("creating enough tables to exceed max_table_num_to_warn") + chClient, err := testutil.NewClickHouseClient(ctx, podDialer, &cr) + Expect(err).NotTo(HaveOccurred()) + defer chClient.Close() + + for i := range 2 { + Expect(chClient.Exec(ctx, fmt.Sprintf( + "CREATE TABLE IF NOT EXISTS default.warn_t%d (x UInt64) ENGINE = MergeTree ORDER BY x", i))).To(Succeed()) + } + + const tableWarning = "The number of attached tables" + + By("confirming the table-count warning is present in system.warnings table") + Eventually(func(g Gomega) { + rows, err := chClient.Query(ctx, "SELECT message FROM system.warnings") + g.Expect(err).NotTo(HaveOccurred()) + defer func() { _ = rows.Close() }() + + var warnings []string + for rows.Next() { + var m string + g.Expect(rows.Scan(&m)).To(Succeed()) + warnings = append(warnings, m) + } + g.Expect(rows.Err()).NotTo(HaveOccurred()) + g.Expect(warnings).To(ContainElement(ContainSubstring(tableWarning)), + "expected the table-count warning in system.warnings, got: %v", warnings) + }).WithPolling(pollingInterval).WithTimeout(time.Minute).Should(Succeed()) + + By("asserting the operator surfaced it as a ClickHouseWarning event") + Eventually(func(g Gomega) { + var events corev1.EventList + g.Expect(k8sClient.List(ctx, &events, client.InNamespace(ns))).To(Succeed()) + + var seen []string + for _, e := range events.Items { + if e.Reason == v1.EventWarningRecord && e.InvolvedObject.Name == cr.Name { + seen = append(seen, e.Message) + } + } + g.Expect(seen).To(ContainElement(ContainSubstring(tableWarning)), + "no ClickHouseWarning event carrying %q; events seen: %v", tableWarning, seen) + }).WithPolling(pollingInterval).WithTimeout(2 * time.Minute).Should(Succeed()) + }) + It("should work with custom data folder mount", func(ctx context.Context) { cr := v1.ClickHouseCluster{ ObjectMeta: metav1.ObjectMeta{ From 32e49797baf9f722ba9b45e60dbd8766cbe1d08c Mon Sep 17 00:00:00 2001 From: Konstantin Vedernikov Date: Mon, 1 Jun 2026 13:56:49 +0000 Subject: [PATCH 02/12] less collision --- internal/controller/clickhouse/sync.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/controller/clickhouse/sync.go b/internal/controller/clickhouse/sync.go index 5b16092e..5b5c8bb8 100644 --- a/internal/controller/clickhouse/sync.go +++ b/internal/controller/clickhouse/sync.go @@ -514,9 +514,9 @@ func (r *clickhouseReconciler) reconcileActiveReplicaStatus(ctx context.Context, } func warningAction(msg string) string { - h := fnv.New32a() + h := fnv.New64a() _, _ = h.Write([]byte(msg)) - return fmt.Sprintf("Warning-%08x", h.Sum32()) + return fmt.Sprintf("Warning-%016x", h.Sum64()) } func (r *clickhouseReconciler) reconcileWarnings(ctx context.Context, log ctrlutil.Logger) (chctrl.StepResult, error) { From c68f6770f31544b45244af1589d9e80ba0aed4c7 Mon Sep 17 00:00:00 2001 From: Konstantin Vedernikov Date: Mon, 1 Jun 2026 14:19:39 +0000 Subject: [PATCH 03/12] fix lint --- api/v1alpha1/events.go | 2 +- internal/controller/clickhouse/commands.go | 19 ++++++++++++++----- internal/controller/clickhouse/sync.go | 2 ++ test/e2e/clickhouse_e2e_test.go | 5 +++++ 4 files changed, 22 insertions(+), 6 deletions(-) diff --git a/api/v1alpha1/events.go b/api/v1alpha1/events.go index c4ec1b66..65e779ec 100644 --- a/api/v1alpha1/events.go +++ b/api/v1alpha1/events.go @@ -38,7 +38,7 @@ const ( EventReasonUpgradeAvailable EventReason = "VersionUpgradeAvailable" ) -// ClickHouse warning event +// ClickHouse warning event. const ( EventWarningRecord EventReason = "ClickHouseWarning" ) diff --git a/internal/controller/clickhouse/commands.go b/internal/controller/clickhouse/commands.go index 2f58562c..3041050d 100644 --- a/internal/controller/clickhouse/commands.go +++ b/internal/controller/clickhouse/commands.go @@ -119,7 +119,7 @@ func (cmd *commander) Probe(ctx context.Context, id v1.ClickHouseReplicaID) (rep return probe, nil } -// Reads system warnings from the server +// Reads system warnings from the server. func (cmd *commander) Warnings(ctx context.Context, id v1.ClickHouseReplicaID) ([]string, error) { conn, err := cmd.getConn(id) if err != nil { @@ -128,18 +128,27 @@ func (cmd *commander) Warnings(ctx context.Context, id v1.ClickHouseReplicaID) ( warnings := []string{} - rows, err := conn.Query(ctx, - "SELECT message FROM system.warnings", - ) + rows, err := conn.Query(ctx, "SELECT message FROM system.warnings") + if err != nil { + return []string{}, fmt.Errorf("failed to query system.warnings on replica %s: %w", id, err) + } + defer func() { + _ = rows.Close() + }() for rows.Next() { var raw string if err := rows.Scan(&raw); err != nil { - return []string{}, fmt.Errorf("probe replica %s: %w", id, err) + return []string{}, fmt.Errorf("failed to get data from system.warnings: replica %s, %w", id, err) } + warnings = append(warnings, raw) } + if err := rows.Err(); err != nil { + return []string{}, fmt.Errorf("failed to scan data from system.warnings %s: %w", id, err) + } + return warnings, nil } diff --git a/internal/controller/clickhouse/sync.go b/internal/controller/clickhouse/sync.go index 5b5c8bb8..1d8d85e9 100644 --- a/internal/controller/clickhouse/sync.go +++ b/internal/controller/clickhouse/sync.go @@ -510,6 +510,7 @@ func (r *clickhouseReconciler) reconcileActiveReplicaStatus(ctx context.Context, } r.evaluateReplicaConditions() + return chctrl.StepContinue(), nil } @@ -560,6 +561,7 @@ func (r *clickhouseReconciler) reconcileWarnings(ctx context.Context, log ctrlut "Replica %s: %s", r.Cluster.HostnameByID(id), warning) } } + return id, warnings, nil }) diff --git a/test/e2e/clickhouse_e2e_test.go b/test/e2e/clickhouse_e2e_test.go index c7163928..32fc84a2 100644 --- a/test/e2e/clickhouse_e2e_test.go +++ b/test/e2e/clickhouse_e2e_test.go @@ -200,8 +200,10 @@ var _ = Describe("ClickHouse controller", Label("clickhouse"), func() { WaitClickHouseUpdatedAndReady(ctx, &cr, 2*time.Minute, false) By("creating enough tables to exceed max_table_num_to_warn") + chClient, err := testutil.NewClickHouseClient(ctx, podDialer, &cr) Expect(err).NotTo(HaveOccurred()) + defer chClient.Close() for i := range 2 { @@ -215,6 +217,7 @@ var _ = Describe("ClickHouse controller", Label("clickhouse"), func() { Eventually(func(g Gomega) { rows, err := chClient.Query(ctx, "SELECT message FROM system.warnings") g.Expect(err).NotTo(HaveOccurred()) + defer func() { _ = rows.Close() }() var warnings []string @@ -223,6 +226,7 @@ var _ = Describe("ClickHouse controller", Label("clickhouse"), func() { g.Expect(rows.Scan(&m)).To(Succeed()) warnings = append(warnings, m) } + g.Expect(rows.Err()).NotTo(HaveOccurred()) g.Expect(warnings).To(ContainElement(ContainSubstring(tableWarning)), "expected the table-count warning in system.warnings, got: %v", warnings) @@ -239,6 +243,7 @@ var _ = Describe("ClickHouse controller", Label("clickhouse"), func() { seen = append(seen, e.Message) } } + g.Expect(seen).To(ContainElement(ContainSubstring(tableWarning)), "no ClickHouseWarning event carrying %q; events seen: %v", tableWarning, seen) }).WithPolling(pollingInterval).WithTimeout(2 * time.Minute).Should(Succeed()) From 8d0ede1e35d59a67f0255fd72dd7e0ca1247761c Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 1 Jun 2026 16:27:56 +0000 Subject: [PATCH 04/12] test: increase cert-manager webhook wait timeout in e2e setup --- test/testutil/utils.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/testutil/utils.go b/test/testutil/utils.go index ce552a89..137db3a2 100644 --- a/test/testutil/utils.go +++ b/test/testutil/utils.go @@ -134,7 +134,7 @@ func InstallCertManager(ctx context.Context) error { cmd = exec.CommandContext(ctx, "kubectl", "wait", "deployment.apps/cert-manager-webhook", "--for", "condition=Available", "--namespace", "cert-manager", - "--timeout", "5m", + "--timeout", "10m", ) _, err := Run(cmd) From ab230ede64f9a644c06500e8e6870feb095d2cc5 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 1 Jun 2026 16:46:37 +0000 Subject: [PATCH 05/12] Fix variable shadowing and odd key-value pairs in reconcileWarnings logging --- internal/controller/clickhouse/sync.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/internal/controller/clickhouse/sync.go b/internal/controller/clickhouse/sync.go index 1d8d85e9..06605d43 100644 --- a/internal/controller/clickhouse/sync.go +++ b/internal/controller/clickhouse/sync.go @@ -548,9 +548,10 @@ func (r *clickhouseReconciler) reconcileWarnings(ctx context.Context, log ctrlut ctx, cancel := context.WithTimeout(ctx, chctrl.LoadReplicaStateTimeout) defer cancel() - warnings, err := r.commander.Warnings(ctx, id) - if err != nil { - log.Debug("failed to get warnings from replica", id, " error", err) + var wErr error + warnings, wErr = r.commander.Warnings(ctx, id) + if wErr != nil { + log.Debug("failed to get warnings from replica", "replica_id", id, "error", wErr) } log.Info("system.warnings fetched", "replica_id", id, "count", len(warnings)) From d3cb268ecd1836a373b08b3ba699ba99cf90261f Mon Sep 17 00:00:00 2001 From: Konstantin Vedernikov <75157521+scanhex12@users.noreply.github.com> Date: Mon, 1 Jun 2026 18:50:49 +0200 Subject: [PATCH 06/12] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- api/v1alpha1/events.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/v1alpha1/events.go b/api/v1alpha1/events.go index 65e779ec..959b2e0f 100644 --- a/api/v1alpha1/events.go +++ b/api/v1alpha1/events.go @@ -40,7 +40,7 @@ const ( // ClickHouse warning event. const ( - EventWarningRecord EventReason = "ClickHouseWarning" + EventReasonClickHouseWarning EventReason = "ClickHouseWarning" ) // EventAction represents the action associated with an event. From 69cefdc8fbf942cb4bd00ee072fe176edfe0f907 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 1 Jun 2026 16:54:05 +0000 Subject: [PATCH 07/12] fix: add missing blank line in reconcileWarnings to satisfy wsl_v5 linter --- internal/controller/clickhouse/sync.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/controller/clickhouse/sync.go b/internal/controller/clickhouse/sync.go index 06605d43..6419db09 100644 --- a/internal/controller/clickhouse/sync.go +++ b/internal/controller/clickhouse/sync.go @@ -549,6 +549,7 @@ func (r *clickhouseReconciler) reconcileWarnings(ctx context.Context, log ctrlut defer cancel() var wErr error + warnings, wErr = r.commander.Warnings(ctx, id) if wErr != nil { log.Debug("failed to get warnings from replica", "replica_id", id, "error", wErr) From 562df98f1e1042c88f4ea1568a13104167771228 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 1 Jun 2026 16:55:10 +0000 Subject: [PATCH 08/12] fix: use correct event constant and idiomatic slice init in reconcileWarnings --- internal/controller/clickhouse/commands.go | 2 +- internal/controller/clickhouse/sync.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/controller/clickhouse/commands.go b/internal/controller/clickhouse/commands.go index 3041050d..17b4cbdc 100644 --- a/internal/controller/clickhouse/commands.go +++ b/internal/controller/clickhouse/commands.go @@ -126,7 +126,7 @@ func (cmd *commander) Warnings(ctx context.Context, id v1.ClickHouseReplicaID) ( return []string{}, fmt.Errorf("failed to get connection for replica %s: %w", id, err) } - warnings := []string{} + var warnings []string rows, err := conn.Query(ctx, "SELECT message FROM system.warnings") if err != nil { diff --git a/internal/controller/clickhouse/sync.go b/internal/controller/clickhouse/sync.go index 6419db09..361c70dd 100644 --- a/internal/controller/clickhouse/sync.go +++ b/internal/controller/clickhouse/sync.go @@ -559,7 +559,7 @@ func (r *clickhouseReconciler) reconcileWarnings(ctx context.Context, log ctrlut for _, warning := range warnings { r.GetRecorder().Eventf(r.Cluster, nil, corev1.EventTypeWarning, - v1.EventWarningRecord, warningAction(warning), + v1.EventReasonClickHouseWarning, warningAction(warning), "Replica %s: %s", r.Cluster.HostnameByID(id), warning) } } From 750a2a921617df46e419478265549e90eb8fbeac Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 1 Jun 2026 16:56:09 +0000 Subject: [PATCH 09/12] fix: replace remaining EventWarningRecord references and use nil returns in commands.go --- internal/controller/clickhouse/commands.go | 8 ++++---- test/e2e/clickhouse_e2e_test.go | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/controller/clickhouse/commands.go b/internal/controller/clickhouse/commands.go index 17b4cbdc..92b42c81 100644 --- a/internal/controller/clickhouse/commands.go +++ b/internal/controller/clickhouse/commands.go @@ -123,14 +123,14 @@ func (cmd *commander) Probe(ctx context.Context, id v1.ClickHouseReplicaID) (rep func (cmd *commander) Warnings(ctx context.Context, id v1.ClickHouseReplicaID) ([]string, error) { conn, err := cmd.getConn(id) if err != nil { - return []string{}, fmt.Errorf("failed to get connection for replica %s: %w", id, err) + return nil, fmt.Errorf("failed to get connection for replica %s: %w", id, err) } var warnings []string rows, err := conn.Query(ctx, "SELECT message FROM system.warnings") if err != nil { - return []string{}, fmt.Errorf("failed to query system.warnings on replica %s: %w", id, err) + return nil, fmt.Errorf("failed to query system.warnings on replica %s: %w", id, err) } defer func() { _ = rows.Close() @@ -139,14 +139,14 @@ func (cmd *commander) Warnings(ctx context.Context, id v1.ClickHouseReplicaID) ( for rows.Next() { var raw string if err := rows.Scan(&raw); err != nil { - return []string{}, fmt.Errorf("failed to get data from system.warnings: replica %s, %w", id, err) + return nil, fmt.Errorf("failed to get data from system.warnings: replica %s, %w", id, err) } warnings = append(warnings, raw) } if err := rows.Err(); err != nil { - return []string{}, fmt.Errorf("failed to scan data from system.warnings %s: %w", id, err) + return nil, fmt.Errorf("failed to scan data from system.warnings %s: %w", id, err) } return warnings, nil diff --git a/test/e2e/clickhouse_e2e_test.go b/test/e2e/clickhouse_e2e_test.go index 32fc84a2..789abe94 100644 --- a/test/e2e/clickhouse_e2e_test.go +++ b/test/e2e/clickhouse_e2e_test.go @@ -239,7 +239,7 @@ var _ = Describe("ClickHouse controller", Label("clickhouse"), func() { var seen []string for _, e := range events.Items { - if e.Reason == v1.EventWarningRecord && e.InvolvedObject.Name == cr.Name { + if e.Reason == v1.EventReasonClickHouseWarning && e.InvolvedObject.Name == cr.Name { seen = append(seen, e.Message) } } From 9222d785ca18d7e318981d9b4e7a61f7a6cd6461 Mon Sep 17 00:00:00 2001 From: Konstantin Vedernikov Date: Mon, 1 Jun 2026 17:07:53 +0000 Subject: [PATCH 10/12] fix --- internal/controller/clickhouse/sync.go | 43 ++++++++++++++------------ 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/internal/controller/clickhouse/sync.go b/internal/controller/clickhouse/sync.go index 361c70dd..10f1ebba 100644 --- a/internal/controller/clickhouse/sync.go +++ b/internal/controller/clickhouse/sync.go @@ -528,11 +528,10 @@ func (r *clickhouseReconciler) reconcileWarnings(ctx context.Context, log ctrlut return chctrl.StepResult{}, fmt.Errorf("list StatefulSets: %w", err) } - ctrlutil.ExecuteParallel(statefulSets.Items, func(sts appsv1.StatefulSet) (v1.ClickHouseReplicaID, []string, error) { + results := ctrlutil.ExecuteParallel(statefulSets.Items, func(sts appsv1.StatefulSet) (v1.ClickHouseReplicaID, struct{}, error) { id, err := v1.ClickHouseIDFromLabels(sts.Labels) if err != nil { - log.Error(err, "get replica ID from StatefulSet labels", "statefulset", sts.Name) - return v1.ClickHouseReplicaID{}, []string{}, fmt.Errorf("get replica ID from StatefulSet labels: %w", err) + return v1.ClickHouseReplicaID{}, struct{}{}, fmt.Errorf("get replica ID from StatefulSet labels: %w", err) } hasError, err := chctrl.CheckPodError(ctx, log, r.GetClient(), &sts) @@ -542,31 +541,35 @@ func (r *clickhouseReconciler) reconcileWarnings(ctx context.Context, log ctrlut hasError = true } - var warnings []string - - if !hasError && sts.Status.ReadyReplicas > 0 && r.commander != nil { - ctx, cancel := context.WithTimeout(ctx, chctrl.LoadReplicaStateTimeout) - defer cancel() + if hasError || sts.Status.ReadyReplicas == 0 || r.commander == nil { + return id, struct{}{}, nil + } - var wErr error + ctx, cancel := context.WithTimeout(ctx, chctrl.LoadReplicaStateTimeout) + defer cancel() - warnings, wErr = r.commander.Warnings(ctx, id) - if wErr != nil { - log.Debug("failed to get warnings from replica", "replica_id", id, "error", wErr) - } + warnings, err := r.commander.Warnings(ctx, id) + if err != nil { + return id, struct{}{}, fmt.Errorf("fetch warnings from replica %s: %w", id, err) + } - log.Info("system.warnings fetched", "replica_id", id, "count", len(warnings)) + log.Debug("system.warnings fetched", "replica_id", id, "count", len(warnings)) - for _, warning := range warnings { - r.GetRecorder().Eventf(r.Cluster, nil, corev1.EventTypeWarning, - v1.EventReasonClickHouseWarning, warningAction(warning), - "Replica %s: %s", r.Cluster.HostnameByID(id), warning) - } + for _, warning := range warnings { + r.GetRecorder().Eventf(r.Cluster, nil, corev1.EventTypeWarning, + v1.EventReasonClickHouseWarning, warningAction(warning), + "Replica %s: %s", r.Cluster.HostnameByID(id), warning) } - return id, warnings, nil + return id, struct{}{}, nil }) + for id, res := range results { + if res.Err != nil { + log.Warn("failed to publish replica warnings", "replica_id", id, "error", res.Err) + } + } + return chctrl.StepRequeue(chctrl.WarningsPollInterval), nil } From 53e7cb518b581fb21a194a76cbec757096c66be9 Mon Sep 17 00:00:00 2001 From: Konstantin Vedernikov Date: Mon, 15 Jun 2026 09:27:19 +0000 Subject: [PATCH 11/12] fix review --- internal/controller/clickhouse/sync.go | 32 ++++++++------------------ 1 file changed, 10 insertions(+), 22 deletions(-) diff --git a/internal/controller/clickhouse/sync.go b/internal/controller/clickhouse/sync.go index 10f1ebba..0fdbfdfd 100644 --- a/internal/controller/clickhouse/sync.go +++ b/internal/controller/clickhouse/sync.go @@ -514,34 +514,22 @@ func (r *clickhouseReconciler) reconcileActiveReplicaStatus(ctx context.Context, return chctrl.StepContinue(), nil } -func warningAction(msg string) string { +func warningAction(shard int32, replica, msg string) string { h := fnv.New64a() - _, _ = h.Write([]byte(msg)) - return fmt.Sprintf("Warning-%016x", h.Sum64()) + _, _ = fmt.Fprintf(h, "%d/%s/%s", shard, replica, msg) + return fmt.Sprintf("Warning-%d-%s-%016x", shard, replica, h.Sum64()) } func (r *clickhouseReconciler) reconcileWarnings(ctx context.Context, log ctrlutil.Logger) (chctrl.StepResult, error) { - listOpts := ctrlutil.AppRequirements(r.Cluster.Namespace, r.Cluster.SpecificName()) - - var statefulSets appsv1.StatefulSetList - if err := r.GetClient().List(ctx, &statefulSets, listOpts); err != nil { - return chctrl.StepResult{}, fmt.Errorf("list StatefulSets: %w", err) + if r.commander == nil { + return chctrl.StepRequeue(chctrl.WarningsPollInterval), nil } - results := ctrlutil.ExecuteParallel(statefulSets.Items, func(sts appsv1.StatefulSet) (v1.ClickHouseReplicaID, struct{}, error) { - id, err := v1.ClickHouseIDFromLabels(sts.Labels) - if err != nil { - return v1.ClickHouseReplicaID{}, struct{}{}, fmt.Errorf("get replica ID from StatefulSet labels: %w", err) - } - - hasError, err := chctrl.CheckPodError(ctx, log, r.GetClient(), &sts) - if err != nil { - log.Warn("failed to check replica pod error", "statefulset", sts.Name, "error", err) - - hasError = true - } + ids := slices.Collect(maps.Keys(r.ReplicaState)) - if hasError || sts.Status.ReadyReplicas == 0 || r.commander == nil { + results := ctrlutil.ExecuteParallel(ids, func(id v1.ClickHouseReplicaID) (v1.ClickHouseReplicaID, struct{}, error) { + replica := r.ReplicaState[id] + if replica.Error || replica.STS == nil || replica.STS.Status.ReadyReplicas == 0 { return id, struct{}{}, nil } @@ -557,7 +545,7 @@ func (r *clickhouseReconciler) reconcileWarnings(ctx context.Context, log ctrlut for _, warning := range warnings { r.GetRecorder().Eventf(r.Cluster, nil, corev1.EventTypeWarning, - v1.EventReasonClickHouseWarning, warningAction(warning), + v1.EventReasonClickHouseWarning, warningAction(r.Cluster.Shards(), r.Cluster.HostnameByID(id), warning), "Replica %s: %s", r.Cluster.HostnameByID(id), warning) } From 7a6469b1f88202ec622c698b54bea12fb0c58c0c Mon Sep 17 00:00:00 2001 From: Pervakov Grigorii Date: Mon, 15 Jun 2026 14:15:35 +0200 Subject: [PATCH 12/12] fix warning action id --- internal/controller/clickhouse/sync.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/controller/clickhouse/sync.go b/internal/controller/clickhouse/sync.go index 0fdbfdfd..0423d3da 100644 --- a/internal/controller/clickhouse/sync.go +++ b/internal/controller/clickhouse/sync.go @@ -514,10 +514,10 @@ func (r *clickhouseReconciler) reconcileActiveReplicaStatus(ctx context.Context, return chctrl.StepContinue(), nil } -func warningAction(shard int32, replica, msg string) string { +func warningAction(id v1.ClickHouseReplicaID, msg string) string { h := fnv.New64a() - _, _ = fmt.Fprintf(h, "%d/%s/%s", shard, replica, msg) - return fmt.Sprintf("Warning-%d-%s-%016x", shard, replica, h.Sum64()) + _, _ = fmt.Fprintf(h, "%d/%d/%s", id.ShardID, id.Index, msg) + return fmt.Sprintf("Warning-%d-%d-%016x", id.ShardID, id.Index, h.Sum64()) } func (r *clickhouseReconciler) reconcileWarnings(ctx context.Context, log ctrlutil.Logger) (chctrl.StepResult, error) { @@ -545,7 +545,7 @@ func (r *clickhouseReconciler) reconcileWarnings(ctx context.Context, log ctrlut for _, warning := range warnings { r.GetRecorder().Eventf(r.Cluster, nil, corev1.EventTypeWarning, - v1.EventReasonClickHouseWarning, warningAction(r.Cluster.Shards(), r.Cluster.HostnameByID(id), warning), + v1.EventReasonClickHouseWarning, warningAction(id, warning), "Replica %s: %s", r.Cluster.HostnameByID(id), warning) }