From 77866879961ae997cabc624b993fa1966ef0583a Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Wed, 14 Jan 2026 19:16:32 +0900 Subject: [PATCH 1/2] HATracker: Add a local cache warmup on start Signed-off-by: SungJin1212 --- CHANGELOG.md | 1 + pkg/ha/ha_tracker.go | 77 +++++++++++++++++++++++++++- pkg/ha/ha_tracker_test.go | 102 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 179 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f2df165baa0..bdc735582e9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ * [FEATURE] Querier: Add experimental projection pushdown support in Parquet Queryable. #7152 * [FEATURE] Ingester: Add experimental active series queried metric. #7173 * [ENHANCEMENT] Querier: Add `-querier.store-gateway-series-batch-size` flag to configure the maximum number of series to be batched in a single gRPC response message from Store Gateways. #7203 +* [ENHANCEMENT] HATracker: Add a local cache warmup on startup to prevent KV store operations. #7213 * [ENHANCEMENT] Ingester: Add support for ingesting Native Histogram with Custom Buckets. #7191 * [ENHANCEMENT] Ingester: Optimize labels out-of-order (ooo) check by allowing the iteration to terminate immediately upon finding the first unsorted label. #7186 * [ENHANCEMENT] Distributor: Skip attaching `__unit__` and `__type__` labels when `-distributor.enable-type-and-unit-labels` is enabled, as these are appended from metadata. #7145 diff --git a/pkg/ha/ha_tracker.go b/pkg/ha/ha_tracker.go index 06a4a6e1689..084e549970b 100644 --- a/pkg/ha/ha_tracker.go +++ b/pkg/ha/ha_tracker.go @@ -5,6 +5,7 @@ import ( "errors" "flag" "fmt" + "maps" "math/rand" "slices" "strings" @@ -222,10 +223,84 @@ func NewHATracker(cfg HATrackerConfig, limits HATrackerLimits, trackerStatusConf t.client = client } - t.Service = services.NewBasicService(nil, t.loop, nil) + t.Service = services.NewBasicService(t.syncKVStoreToLocalMap, t.loop, nil) return t, nil } +// syncKVStoreToLocalMap warms up the local cache by fetching all active entries from the KV store. +func (c *HATracker) syncKVStoreToLocalMap(ctx context.Context) error { + if !c.cfg.EnableHATracker { + return nil + } + + start := time.Now() + level.Info(c.logger).Log("msg", "starting HA tracker cache warmup") + + keys, err := c.client.List(ctx, "") + if err != nil { + level.Error(c.logger).Log("msg", "failed to list keys during HA tracker cache warmup", "err", err) + return err + } + + if len(keys) == 0 { + level.Info(c.logger).Log("msg", "HA tracker cache warmup finished", "reason", "no keys found in KV store") + return nil + } + + // create temporarily map + tempElected := make(map[string]ReplicaDesc, len(keys)) + tempReplicaGroups := make(map[string]map[string]struct{}) + successCount := 0 + + for _, key := range keys { + if ctx.Err() != nil { + return ctx.Err() + } + + val, err := c.client.Get(ctx, key) + if err != nil { + level.Warn(c.logger).Log("msg", "failed to fetch key during cache warmup", "key", key, "err", err) + continue + } + + desc, ok := val.(*ReplicaDesc) + if !ok || desc == nil || desc.DeletedAt > 0 { + continue + } + + user, cluster, keyHasSeparator := strings.Cut(key, "/") + if !keyHasSeparator { + continue + } + + tempElected[key] = *desc + if tempReplicaGroups[user] == nil { + tempReplicaGroups[user] = make(map[string]struct{}) + } + tempReplicaGroups[user][cluster] = struct{}{} + successCount++ + } + + c.electedLock.Lock() + + // Update local map + maps.Copy(c.elected, tempElected) + for user, clusters := range tempReplicaGroups { + if c.replicaGroups[user] == nil { + c.replicaGroups[user] = make(map[string]struct{}) + } + for cluster := range clusters { + c.replicaGroups[user][cluster] = struct{}{} + } + } + c.electedLock.Unlock() + + c.updateUserReplicaGroupCount() + + level.Info(c.logger).Log("msg", "HA tracker cache warmup completed", "duration", time.Since(start), "synced keys", successCount) + return nil +} + // Follows pattern used by ring for WatchKey. func (c *HATracker) loop(ctx context.Context) error { if !c.cfg.EnableHATracker { diff --git a/pkg/ha/ha_tracker_test.go b/pkg/ha/ha_tracker_test.go index 3d576082aae..6ecb2a44663 100644 --- a/pkg/ha/ha_tracker_test.go +++ b/pkg/ha/ha_tracker_test.go @@ -758,6 +758,108 @@ func TestCheckReplicaCleanup(t *testing.T) { )) } +func TestHATracker_CacheWarmupOnStart(t *testing.T) { + t.Parallel() + ctx := context.Background() + reg := prometheus.NewPedanticRegistry() + + codec := GetReplicaDescCodec() + kvStore, closer := consul.NewInMemoryClient(codec, log.NewNopLogger(), nil) + t.Cleanup(func() { assert.NoError(t, closer.Close()) }) + + mockKV := kv.PrefixClient(kvStore, "prefix") + + // CAS valid entry + user1 := "user1" + clusterUser1 := "clusterUser1" + key1 := fmt.Sprintf("%s/%s", user1, clusterUser1) + desc1 := &ReplicaDesc{ + Replica: "replica-0", + ReceivedAt: timestamp.FromTime(time.Now()), + } + + err := mockKV.CAS(ctx, key1, func(_ any) (any, bool, error) { + return desc1, true, nil + }) + require.NoError(t, err) + + user2 := "user2" + clusterUser2 := "clusterUser2" + key2 := fmt.Sprintf("%s/%s", user2, clusterUser2) + desc2 := &ReplicaDesc{ + Replica: "replica-0", + ReceivedAt: timestamp.FromTime(time.Now()), + } + err = mockKV.CAS(ctx, key2, func(_ any) (any, bool, error) { + return desc2, true, nil + }) + require.NoError(t, err) + + // CAS deleted entry + clusterDeleted := "clusterDeleted" + keyDeleted := fmt.Sprintf("%s/%s", user1, clusterDeleted) + descDeleted := &ReplicaDesc{ + Replica: "replica-old", + ReceivedAt: timestamp.FromTime(time.Now()), + DeletedAt: timestamp.FromTime(time.Now()), // Marked as deleted + } + err = mockKV.CAS(ctx, keyDeleted, func(_ any) (any, bool, error) { + return descDeleted, true, nil + }) + require.NoError(t, err) + + cfg := HATrackerConfig{ + EnableHATracker: true, + KVStore: kv.Config{Mock: mockKV}, // Use the seeded KV + UpdateTimeout: time.Second, + UpdateTimeoutJitterMax: 0, + FailoverTimeout: time.Second, + } + + tracker, err := NewHATracker(cfg, trackerLimits{maxReplicaGroups: 100}, haTrackerStatusConfig, prometheus.WrapRegistererWithPrefix("cortex_", reg), "test-ha-tracker", log.NewNopLogger()) + require.NoError(t, err) + + // Start ha tracker + require.NoError(t, services.StartAndAwaitRunning(ctx, tracker)) + defer services.StopAndAwaitTerminated(ctx, tracker) // nolint:errcheck + + tracker.electedLock.Lock() + // Check local cache updated + desc1Cached, ok := tracker.elected[key1] + require.True(t, ok) + require.Equal(t, desc1.Replica, desc1Cached.Replica) + + _, ok = tracker.elected[keyDeleted] + require.False(t, ok) + + desc2Cached, ok := tracker.elected[key2] + require.True(t, ok) + require.Equal(t, desc2.Replica, desc2Cached.Replica) + + // user1 should have 1 group (clusterUser1), ignoring clusterDeleted + require.NotNil(t, tracker.replicaGroups[user1]) + require.Equal(t, 1, len(tracker.replicaGroups[user1])) + _, hasClusterUser1 := tracker.replicaGroups[user1][clusterUser1] + require.True(t, hasClusterUser1) + + // user2 should have 1 group (clusterUser2), ignoring clusterDeleted + require.NotNil(t, tracker.replicaGroups[user2]) + require.Equal(t, 1, len(tracker.replicaGroups[user2])) + _, hasClusterUser2 := tracker.replicaGroups[user2][clusterUser2] + require.True(t, hasClusterUser2) + + tracker.electedLock.Unlock() + + // Check metric updated + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_ha_tracker_user_replica_group_count Number of HA replica groups tracked for each user. + # TYPE cortex_ha_tracker_user_replica_group_count gauge + cortex_ha_tracker_user_replica_group_count{user="user1"} 1 + cortex_ha_tracker_user_replica_group_count{user="user2"} 1 + `), "cortex_ha_tracker_user_replica_group_count", + )) +} + func checkUserReplicaGroups(t *testing.T, duration time.Duration, c *HATracker, user string, expectedReplicaGroups int) { t.Helper() test.Poll(t, duration, nil, func() any { From 2644e14505fb8100faa92f3c878c0ccec605318f Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Thu, 15 Jan 2026 16:54:25 +0900 Subject: [PATCH 2/2] Add benchmark Signed-off-by: SungJin1212 --- pkg/ha/ha_tracker_test.go | 42 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/pkg/ha/ha_tracker_test.go b/pkg/ha/ha_tracker_test.go index 6ecb2a44663..2e07c83088a 100644 --- a/pkg/ha/ha_tracker_test.go +++ b/pkg/ha/ha_tracker_test.go @@ -758,6 +758,48 @@ func TestCheckReplicaCleanup(t *testing.T) { )) } +func BenchmarkHATracker_syncKVStoreToLocalMap(b *testing.B) { + keyCounts := []int{100, 1000, 10000} + + for _, count := range keyCounts { + b.Run(fmt.Sprintf("keys=%d", count), func(b *testing.B) { + ctx := context.Background() + + codec := GetReplicaDescCodec() + kvStore, closer := consul.NewInMemoryClient(codec, log.NewNopLogger(), nil) + b.Cleanup(func() { assert.NoError(b, closer.Close()) }) + + mockKV := kv.PrefixClient(kvStore, "prefix") + + for i := range count { + key := fmt.Sprintf("user-%d/cluster-%d", i%100, i) + desc := &ReplicaDesc{ + Replica: fmt.Sprintf("replica-%d", i), + ReceivedAt: timestamp.FromTime(time.Now()), + } + err := mockKV.CAS(ctx, key, func(_ any) (any, bool, error) { + return desc, true, nil + }) + require.NoError(b, err) + } + + cfg := HATrackerConfig{ + EnableHATracker: true, + KVStore: kv.Config{Mock: mockKV}, + } + tracker, _ := NewHATracker(cfg, trackerLimits{}, haTrackerStatusConfig, nil, "bench", log.NewNopLogger()) + + b.ReportAllocs() + for b.Loop() { + err := tracker.syncKVStoreToLocalMap(ctx) + if err != nil { + b.Fatal(err) + } + } + }) + } +} + func TestHATracker_CacheWarmupOnStart(t *testing.T) { t.Parallel() ctx := context.Background()