diff --git a/api/v1alpha1/events.go b/api/v1alpha1/events.go index 83689fa..959b2e0 100644 --- a/api/v1alpha1/events.go +++ b/api/v1alpha1/events.go @@ -38,6 +38,11 @@ const ( EventReasonUpgradeAvailable EventReason = "VersionUpgradeAvailable" ) +// ClickHouse warning event. +const ( + EventReasonClickHouseWarning EventReason = "ClickHouseWarning" +) + // EventAction represents the action associated with an event. type EventAction = string diff --git a/internal/controller/clickhouse/commands.go b/internal/controller/clickhouse/commands.go index b283852..92b42c8 100644 --- a/internal/controller/clickhouse/commands.go +++ b/internal/controller/clickhouse/commands.go @@ -119,6 +119,39 @@ func (cmd *commander) Probe(ctx context.Context, id v1.ClickHouseReplicaID) (rep return probe, nil } +// Reads system warnings from the server. +func (cmd *commander) Warnings(ctx context.Context, id v1.ClickHouseReplicaID) ([]string, error) { + conn, err := cmd.getConn(id) + if err != nil { + return nil, fmt.Errorf("failed to get connection for replica %s: %w", id, err) + } + + var warnings []string + + rows, err := conn.Query(ctx, "SELECT message FROM system.warnings") + if err != nil { + return nil, fmt.Errorf("failed to query system.warnings on replica %s: %w", id, err) + } + defer func() { + _ = rows.Close() + }() + + for rows.Next() { + var raw string + if err := rows.Scan(&raw); err != nil { + return nil, fmt.Errorf("failed to get data from system.warnings: replica %s, %w", id, err) + } + + warnings = append(warnings, raw) + } + + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("failed to scan data from system.warnings %s: %w", id, err) + } + + return warnings, nil +} + // ReloadConfig queries the replica to reload its configuration. func (cmd *commander) ReloadConfig(ctx context.Context, id v1.ClickHouseReplicaID) error { conn, err := cmd.getConn(id) diff --git a/internal/controller/clickhouse/sync.go b/internal/controller/clickhouse/sync.go index 010917d..0423d3d 100644 --- a/internal/controller/clickhouse/sync.go +++ b/internal/controller/clickhouse/sync.go @@ -4,6 +4,7 @@ import ( "cmp" "context" "fmt" + "hash/fnv" "maps" "slices" "strconv" @@ -130,6 +131,7 @@ func (r *clickhouseReconciler) sync(ctx context.Context, log ctrlutil.Logger) (c {Name: "ClusterSecret", Fn: r.reconcileClusterSecret, Always: true}, {Name: "ExternalSecret", Fn: r.reconcileExternalSecret, Always: true}, {Name: "ActiveReplicaStatus", Fn: r.reconcileActiveReplicaStatus, Always: true}, + {Name: "Warnings", Fn: r.reconcileWarnings, Always: true}, {Name: "ClusterRevisions", Fn: r.reconcileClusterRevisions, Always: true}, {Name: "ReplicaResources", Fn: r.reconcileReplicaResources}, {Name: "DatabaseSync", Fn: r.reconcileDatabaseSync}, @@ -512,6 +514,53 @@ func (r *clickhouseReconciler) reconcileActiveReplicaStatus(ctx context.Context, return chctrl.StepContinue(), nil } +func warningAction(id v1.ClickHouseReplicaID, msg string) string { + h := fnv.New64a() + _, _ = fmt.Fprintf(h, "%d/%d/%s", id.ShardID, id.Index, msg) + return fmt.Sprintf("Warning-%d-%d-%016x", id.ShardID, id.Index, h.Sum64()) +} + +func (r *clickhouseReconciler) reconcileWarnings(ctx context.Context, log ctrlutil.Logger) (chctrl.StepResult, error) { + if r.commander == nil { + return chctrl.StepRequeue(chctrl.WarningsPollInterval), nil + } + + ids := slices.Collect(maps.Keys(r.ReplicaState)) + + results := ctrlutil.ExecuteParallel(ids, func(id v1.ClickHouseReplicaID) (v1.ClickHouseReplicaID, struct{}, error) { + replica := r.ReplicaState[id] + if replica.Error || replica.STS == nil || replica.STS.Status.ReadyReplicas == 0 { + return id, struct{}{}, nil + } + + ctx, cancel := context.WithTimeout(ctx, chctrl.LoadReplicaStateTimeout) + defer cancel() + + warnings, err := r.commander.Warnings(ctx, id) + if err != nil { + return id, struct{}{}, fmt.Errorf("fetch warnings from replica %s: %w", id, err) + } + + log.Debug("system.warnings fetched", "replica_id", id, "count", len(warnings)) + + for _, warning := range warnings { + r.GetRecorder().Eventf(r.Cluster, nil, corev1.EventTypeWarning, + v1.EventReasonClickHouseWarning, warningAction(id, warning), + "Replica %s: %s", r.Cluster.HostnameByID(id), warning) + } + + return id, struct{}{}, nil + }) + + for id, res := range results { + if res.Err != nil { + log.Warn("failed to publish replica warnings", "replica_id", id, "error", res.Err) + } + } + + return chctrl.StepRequeue(chctrl.WarningsPollInterval), nil +} + func (r *clickhouseReconciler) reconcileClusterRevisions(ctx context.Context, log ctrlutil.Logger) (chctrl.StepResult, error) { if r.Cluster.Status.ObservedGeneration != r.Cluster.Generation { r.Cluster.Status.ObservedGeneration = r.Cluster.Generation diff --git a/internal/controller/constants.go b/internal/controller/constants.go index 9dd04c3..91147db 100644 --- a/internal/controller/constants.go +++ b/internal/controller/constants.go @@ -9,6 +9,7 @@ import ( const ( RequeueOnRefreshTimeout = time.Second LoadReplicaStateTimeout = 10 * time.Second + WarningsPollInterval = 30 * time.Second TLSFileMode int32 = 0444 DefaultUser int64 = 101 ) diff --git a/test/e2e/clickhouse_e2e_test.go b/test/e2e/clickhouse_e2e_test.go index 0e10b3c..789abe9 100644 --- a/test/e2e/clickhouse_e2e_test.go +++ b/test/e2e/clickhouse_e2e_test.go @@ -172,6 +172,83 @@ var _ = Describe("ClickHouse controller", Label("clickhouse"), func() { Entry("scale down to 2 replicas", 3, v1.ClickHouseClusterSpec{Replicas: new(int32(2))}), ) + It("should surface system.warnings as ClickHouseWarning events", func(ctx context.Context) { + cr := v1.ClickHouseCluster{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: ns, + Name: fmt.Sprintf("warnings-%d", rand.Uint32()), //nolint:gosec + }, + Spec: v1.ClickHouseClusterSpec{ + Replicas: new(int32(1)), + ContainerTemplate: v1.ContainerTemplateSpec{ + Image: v1.ContainerImage{Tag: BaseVersion}, + }, + DataVolumeClaimSpec: &defaultStorage, + KeeperClusterRef: v1.KeeperClusterReference{Name: keeper.Name}, + Settings: v1.ClickHouseSettings{ + ExtraConfig: runtime.RawExtension{Raw: []byte(`{"max_table_num_to_warn": 1}`)}, + }, + }, + } + + By("creating cluster CR") + Expect(k8sClient.Create(ctx, &cr)).To(Succeed()) + DeferCleanup(func(ctx context.Context) { + By("deleting cluster CR") + Expect(k8sClient.Delete(ctx, &cr)).To(Succeed()) + }) + WaitClickHouseUpdatedAndReady(ctx, &cr, 2*time.Minute, false) + + By("creating enough tables to exceed max_table_num_to_warn") + + chClient, err := testutil.NewClickHouseClient(ctx, podDialer, &cr) + Expect(err).NotTo(HaveOccurred()) + + defer chClient.Close() + + for i := range 2 { + Expect(chClient.Exec(ctx, fmt.Sprintf( + "CREATE TABLE IF NOT EXISTS default.warn_t%d (x UInt64) ENGINE = MergeTree ORDER BY x", i))).To(Succeed()) + } + + const tableWarning = "The number of attached tables" + + By("confirming the table-count warning is present in system.warnings table") + Eventually(func(g Gomega) { + rows, err := chClient.Query(ctx, "SELECT message FROM system.warnings") + g.Expect(err).NotTo(HaveOccurred()) + + defer func() { _ = rows.Close() }() + + var warnings []string + for rows.Next() { + var m string + g.Expect(rows.Scan(&m)).To(Succeed()) + warnings = append(warnings, m) + } + + g.Expect(rows.Err()).NotTo(HaveOccurred()) + g.Expect(warnings).To(ContainElement(ContainSubstring(tableWarning)), + "expected the table-count warning in system.warnings, got: %v", warnings) + }).WithPolling(pollingInterval).WithTimeout(time.Minute).Should(Succeed()) + + By("asserting the operator surfaced it as a ClickHouseWarning event") + Eventually(func(g Gomega) { + var events corev1.EventList + g.Expect(k8sClient.List(ctx, &events, client.InNamespace(ns))).To(Succeed()) + + var seen []string + for _, e := range events.Items { + if e.Reason == v1.EventReasonClickHouseWarning && e.InvolvedObject.Name == cr.Name { + seen = append(seen, e.Message) + } + } + + g.Expect(seen).To(ContainElement(ContainSubstring(tableWarning)), + "no ClickHouseWarning event carrying %q; events seen: %v", tableWarning, seen) + }).WithPolling(pollingInterval).WithTimeout(2 * time.Minute).Should(Succeed()) + }) + It("should work with custom data folder mount", func(ctx context.Context) { cr := v1.ClickHouseCluster{ ObjectMeta: metav1.ObjectMeta{ diff --git a/test/testutil/utils.go b/test/testutil/utils.go index ce552a8..137db3a 100644 --- a/test/testutil/utils.go +++ b/test/testutil/utils.go @@ -134,7 +134,7 @@ func InstallCertManager(ctx context.Context) error { cmd = exec.CommandContext(ctx, "kubectl", "wait", "deployment.apps/cert-manager-webhook", "--for", "condition=Available", "--namespace", "cert-manager", - "--timeout", "5m", + "--timeout", "10m", ) _, err := Run(cmd)