Skip to content
5 changes: 5 additions & 0 deletions api/v1alpha1/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ const (
EventReasonUpgradeAvailable EventReason = "VersionUpgradeAvailable"
)

// ClickHouse warning event.
const (
EventReasonClickHouseWarning EventReason = "ClickHouseWarning"
)
Comment thread
scanhex12 marked this conversation as resolved.

// EventAction represents the action associated with an event.
type EventAction = string

Expand Down
33 changes: 33 additions & 0 deletions internal/controller/clickhouse/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,39 @@ func (cmd *commander) Probe(ctx context.Context, id v1.ClickHouseReplicaID) (rep
return probe, nil
}

// Reads system warnings from the server.
func (cmd *commander) Warnings(ctx context.Context, id v1.ClickHouseReplicaID) ([]string, error) {
conn, err := cmd.getConn(id)
if err != nil {
return nil, fmt.Errorf("failed to get connection for replica %s: %w", id, err)
}

var warnings []string

rows, err := conn.Query(ctx, "SELECT message FROM system.warnings")
if err != nil {
return nil, fmt.Errorf("failed to query system.warnings on replica %s: %w", id, err)
}
defer func() {
_ = rows.Close()
}()

for rows.Next() {
var raw string
if err := rows.Scan(&raw); err != nil {
return nil, fmt.Errorf("failed to get data from system.warnings: replica %s, %w", id, err)
}

warnings = append(warnings, raw)
}

if err := rows.Err(); err != nil {
return nil, fmt.Errorf("failed to scan data from system.warnings %s: %w", id, err)
}

return warnings, nil
}

// ReloadConfig queries the replica to reload its configuration.
func (cmd *commander) ReloadConfig(ctx context.Context, id v1.ClickHouseReplicaID) error {
conn, err := cmd.getConn(id)
Expand Down
49 changes: 49 additions & 0 deletions internal/controller/clickhouse/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"cmp"
"context"
"fmt"
"hash/fnv"
"maps"
"slices"
"strconv"
Expand Down Expand Up @@ -130,6 +131,7 @@ func (r *clickhouseReconciler) sync(ctx context.Context, log ctrlutil.Logger) (c
{Name: "ClusterSecret", Fn: r.reconcileClusterSecret, Always: true},
{Name: "ExternalSecret", Fn: r.reconcileExternalSecret, Always: true},
{Name: "ActiveReplicaStatus", Fn: r.reconcileActiveReplicaStatus, Always: true},
{Name: "Warnings", Fn: r.reconcileWarnings, Always: true},
{Name: "ClusterRevisions", Fn: r.reconcileClusterRevisions, Always: true},
{Name: "ReplicaResources", Fn: r.reconcileReplicaResources},
{Name: "DatabaseSync", Fn: r.reconcileDatabaseSync},
Expand Down Expand Up @@ -512,6 +514,53 @@ func (r *clickhouseReconciler) reconcileActiveReplicaStatus(ctx context.Context,
return chctrl.StepContinue(), nil
}

func warningAction(id v1.ClickHouseReplicaID, msg string) string {
h := fnv.New64a()
_, _ = fmt.Fprintf(h, "%d/%d/%s", id.ShardID, id.Index, msg)
return fmt.Sprintf("Warning-%d-%d-%016x", id.ShardID, id.Index, h.Sum64())
}

func (r *clickhouseReconciler) reconcileWarnings(ctx context.Context, log ctrlutil.Logger) (chctrl.StepResult, error) {
Comment thread
GrigoryPervakov marked this conversation as resolved.
if r.commander == nil {
return chctrl.StepRequeue(chctrl.WarningsPollInterval), nil
}

ids := slices.Collect(maps.Keys(r.ReplicaState))

results := ctrlutil.ExecuteParallel(ids, func(id v1.ClickHouseReplicaID) (v1.ClickHouseReplicaID, struct{}, error) {
replica := r.ReplicaState[id]
if replica.Error || replica.STS == nil || replica.STS.Status.ReadyReplicas == 0 {
return id, struct{}{}, nil
}

ctx, cancel := context.WithTimeout(ctx, chctrl.LoadReplicaStateTimeout)
defer cancel()

warnings, err := r.commander.Warnings(ctx, id)
if err != nil {
return id, struct{}{}, fmt.Errorf("fetch warnings from replica %s: %w", id, err)
}

log.Debug("system.warnings fetched", "replica_id", id, "count", len(warnings))

for _, warning := range warnings {
r.GetRecorder().Eventf(r.Cluster, nil, corev1.EventTypeWarning,
v1.EventReasonClickHouseWarning, warningAction(id, warning),
"Replica %s: %s", r.Cluster.HostnameByID(id), warning)
}

return id, struct{}{}, nil
})

for id, res := range results {
if res.Err != nil {
log.Warn("failed to publish replica warnings", "replica_id", id, "error", res.Err)
}
}

return chctrl.StepRequeue(chctrl.WarningsPollInterval), nil
Comment thread
scanhex12 marked this conversation as resolved.
}

func (r *clickhouseReconciler) reconcileClusterRevisions(ctx context.Context, log ctrlutil.Logger) (chctrl.StepResult, error) {
if r.Cluster.Status.ObservedGeneration != r.Cluster.Generation {
r.Cluster.Status.ObservedGeneration = r.Cluster.Generation
Expand Down
1 change: 1 addition & 0 deletions internal/controller/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
const (
RequeueOnRefreshTimeout = time.Second
LoadReplicaStateTimeout = 10 * time.Second
WarningsPollInterval = 30 * time.Second
TLSFileMode int32 = 0444
DefaultUser int64 = 101
)
Expand Down
77 changes: 77 additions & 0 deletions test/e2e/clickhouse_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,83 @@ var _ = Describe("ClickHouse controller", Label("clickhouse"), func() {
Entry("scale down to 2 replicas", 3, v1.ClickHouseClusterSpec{Replicas: new(int32(2))}),
)

It("should surface system.warnings as ClickHouseWarning events", func(ctx context.Context) {
cr := v1.ClickHouseCluster{
ObjectMeta: metav1.ObjectMeta{
Namespace: ns,
Name: fmt.Sprintf("warnings-%d", rand.Uint32()), //nolint:gosec
},
Spec: v1.ClickHouseClusterSpec{
Replicas: new(int32(1)),
ContainerTemplate: v1.ContainerTemplateSpec{
Image: v1.ContainerImage{Tag: BaseVersion},
},
DataVolumeClaimSpec: &defaultStorage,
KeeperClusterRef: v1.KeeperClusterReference{Name: keeper.Name},
Settings: v1.ClickHouseSettings{
ExtraConfig: runtime.RawExtension{Raw: []byte(`{"max_table_num_to_warn": 1}`)},
},
},
}

By("creating cluster CR")
Expect(k8sClient.Create(ctx, &cr)).To(Succeed())
DeferCleanup(func(ctx context.Context) {
By("deleting cluster CR")
Expect(k8sClient.Delete(ctx, &cr)).To(Succeed())
})
WaitClickHouseUpdatedAndReady(ctx, &cr, 2*time.Minute, false)

By("creating enough tables to exceed max_table_num_to_warn")

chClient, err := testutil.NewClickHouseClient(ctx, podDialer, &cr)
Expect(err).NotTo(HaveOccurred())

defer chClient.Close()

for i := range 2 {
Expect(chClient.Exec(ctx, fmt.Sprintf(
"CREATE TABLE IF NOT EXISTS default.warn_t%d (x UInt64) ENGINE = MergeTree ORDER BY x", i))).To(Succeed())
}

const tableWarning = "The number of attached tables"

By("confirming the table-count warning is present in system.warnings table")
Eventually(func(g Gomega) {
rows, err := chClient.Query(ctx, "SELECT message FROM system.warnings")
g.Expect(err).NotTo(HaveOccurred())

defer func() { _ = rows.Close() }()

var warnings []string
for rows.Next() {
var m string
g.Expect(rows.Scan(&m)).To(Succeed())
warnings = append(warnings, m)
}

g.Expect(rows.Err()).NotTo(HaveOccurred())
g.Expect(warnings).To(ContainElement(ContainSubstring(tableWarning)),
"expected the table-count warning in system.warnings, got: %v", warnings)
}).WithPolling(pollingInterval).WithTimeout(time.Minute).Should(Succeed())

By("asserting the operator surfaced it as a ClickHouseWarning event")
Eventually(func(g Gomega) {
var events corev1.EventList
g.Expect(k8sClient.List(ctx, &events, client.InNamespace(ns))).To(Succeed())

var seen []string
for _, e := range events.Items {
if e.Reason == v1.EventReasonClickHouseWarning && e.InvolvedObject.Name == cr.Name {
seen = append(seen, e.Message)
}
}

g.Expect(seen).To(ContainElement(ContainSubstring(tableWarning)),
"no ClickHouseWarning event carrying %q; events seen: %v", tableWarning, seen)
}).WithPolling(pollingInterval).WithTimeout(2 * time.Minute).Should(Succeed())
})

It("should work with custom data folder mount", func(ctx context.Context) {
cr := v1.ClickHouseCluster{
ObjectMeta: metav1.ObjectMeta{
Expand Down
2 changes: 1 addition & 1 deletion test/testutil/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func InstallCertManager(ctx context.Context) error {
cmd = exec.CommandContext(ctx, "kubectl", "wait", "deployment.apps/cert-manager-webhook",
"--for", "condition=Available",
"--namespace", "cert-manager",
"--timeout", "5m",
"--timeout", "10m",
)

_, err := Run(cmd)
Expand Down
Loading