Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* [FEATURE] Querier: Add experimental projection pushdown support in Parquet Queryable. #7152
* [FEATURE] Ingester: Add experimental active series queried metric. #7173
* [ENHANCEMENT] Querier: Add `-querier.store-gateway-series-batch-size` flag to configure the maximum number of series to be batched in a single gRPC response message from Store Gateways. #7203
* [ENHANCEMENT] HATracker: Add a local cache warmup on startup to prevent KV store operations. #7213
* [ENHANCEMENT] Ingester: Add support for ingesting Native Histogram with Custom Buckets. #7191
* [ENHANCEMENT] Ingester: Optimize labels out-of-order (ooo) check by allowing the iteration to terminate immediately upon finding the first unsorted label. #7186
* [ENHANCEMENT] Distributor: Skip attaching `__unit__` and `__type__` labels when `-distributor.enable-type-and-unit-labels` is enabled, as these are appended from metadata. #7145
Expand Down
77 changes: 76 additions & 1 deletion pkg/ha/ha_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"flag"
"fmt"
"maps"
"math/rand"
"slices"
"strings"
Expand Down Expand Up @@ -222,10 +223,84 @@ func NewHATracker(cfg HATrackerConfig, limits HATrackerLimits, trackerStatusConf
t.client = client
}

t.Service = services.NewBasicService(nil, t.loop, nil)
t.Service = services.NewBasicService(t.syncKVStoreToLocalMap, t.loop, nil)
return t, nil
}

// syncKVStoreToLocalMap warms up the local cache by fetching all active entries from the KV store.
func (c *HATracker) syncKVStoreToLocalMap(ctx context.Context) error {
if !c.cfg.EnableHATracker {
return nil
}

start := time.Now()
level.Info(c.logger).Log("msg", "starting HA tracker cache warmup")

keys, err := c.client.List(ctx, "")
if err != nil {
level.Error(c.logger).Log("msg", "failed to list keys during HA tracker cache warmup", "err", err)
return err
}

if len(keys) == 0 {
level.Info(c.logger).Log("msg", "HA tracker cache warmup finished", "reason", "no keys found in KV store")
return nil
}

// create temporarily map
tempElected := make(map[string]ReplicaDesc, len(keys))
tempReplicaGroups := make(map[string]map[string]struct{})
successCount := 0

for _, key := range keys {
if ctx.Err() != nil {
return ctx.Err()
}

val, err := c.client.Get(ctx, key)
if err != nil {
level.Warn(c.logger).Log("msg", "failed to fetch key during cache warmup", "key", key, "err", err)
continue
}

desc, ok := val.(*ReplicaDesc)
if !ok || desc == nil || desc.DeletedAt > 0 {
continue
}

user, cluster, keyHasSeparator := strings.Cut(key, "/")
if !keyHasSeparator {
continue
}

tempElected[key] = *desc
if tempReplicaGroups[user] == nil {
tempReplicaGroups[user] = make(map[string]struct{})
}
tempReplicaGroups[user][cluster] = struct{}{}
successCount++
}

c.electedLock.Lock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure this is needed. as this is called before running the loop and starting the service.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right that there is no lock contention currently, as the Distributor accepts traffic only after the HATracker is in the Running state.
However, since this lock is acquired only once at startup, the runtime performance cost is effectively zero. I think it is better to keep it as a safety guard to protect the system against any future changes in the calling order or potential mistakes.
WDYT?


// Update local map
maps.Copy(c.elected, tempElected)
for user, clusters := range tempReplicaGroups {
if c.replicaGroups[user] == nil {
c.replicaGroups[user] = make(map[string]struct{})
}
for cluster := range clusters {
c.replicaGroups[user][cluster] = struct{}{}
}
}
c.electedLock.Unlock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here


c.updateUserReplicaGroupCount()

level.Info(c.logger).Log("msg", "HA tracker cache warmup completed", "duration", time.Since(start), "synced keys", successCount)
return nil
}

// Follows pattern used by ring for WatchKey.
func (c *HATracker) loop(ctx context.Context) error {
if !c.cfg.EnableHATracker {
Expand Down
144 changes: 144 additions & 0 deletions pkg/ha/ha_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,150 @@ func TestCheckReplicaCleanup(t *testing.T) {
))
}

func BenchmarkHATracker_syncKVStoreToLocalMap(b *testing.B) {
keyCounts := []int{100, 1000, 10000}

for _, count := range keyCounts {
b.Run(fmt.Sprintf("keys=%d", count), func(b *testing.B) {
ctx := context.Background()

codec := GetReplicaDescCodec()
kvStore, closer := consul.NewInMemoryClient(codec, log.NewNopLogger(), nil)
b.Cleanup(func() { assert.NoError(b, closer.Close()) })

mockKV := kv.PrefixClient(kvStore, "prefix")

for i := range count {
key := fmt.Sprintf("user-%d/cluster-%d", i%100, i)
desc := &ReplicaDesc{
Replica: fmt.Sprintf("replica-%d", i),
ReceivedAt: timestamp.FromTime(time.Now()),
}
err := mockKV.CAS(ctx, key, func(_ any) (any, bool, error) {
return desc, true, nil
})
require.NoError(b, err)
}

cfg := HATrackerConfig{
EnableHATracker: true,
KVStore: kv.Config{Mock: mockKV},
}
tracker, _ := NewHATracker(cfg, trackerLimits{}, haTrackerStatusConfig, nil, "bench", log.NewNopLogger())

b.ReportAllocs()
for b.Loop() {
err := tracker.syncKVStoreToLocalMap(ctx)
if err != nil {
b.Fatal(err)
}
}
})
}
}

func TestHATracker_CacheWarmupOnStart(t *testing.T) {
t.Parallel()
ctx := context.Background()
reg := prometheus.NewPedanticRegistry()

codec := GetReplicaDescCodec()
kvStore, closer := consul.NewInMemoryClient(codec, log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

mockKV := kv.PrefixClient(kvStore, "prefix")

// CAS valid entry
user1 := "user1"
clusterUser1 := "clusterUser1"
key1 := fmt.Sprintf("%s/%s", user1, clusterUser1)
desc1 := &ReplicaDesc{
Replica: "replica-0",
ReceivedAt: timestamp.FromTime(time.Now()),
}

err := mockKV.CAS(ctx, key1, func(_ any) (any, bool, error) {
return desc1, true, nil
})
require.NoError(t, err)

user2 := "user2"
clusterUser2 := "clusterUser2"
key2 := fmt.Sprintf("%s/%s", user2, clusterUser2)
desc2 := &ReplicaDesc{
Replica: "replica-0",
ReceivedAt: timestamp.FromTime(time.Now()),
}
err = mockKV.CAS(ctx, key2, func(_ any) (any, bool, error) {
return desc2, true, nil
})
require.NoError(t, err)

// CAS deleted entry
clusterDeleted := "clusterDeleted"
keyDeleted := fmt.Sprintf("%s/%s", user1, clusterDeleted)
descDeleted := &ReplicaDesc{
Replica: "replica-old",
ReceivedAt: timestamp.FromTime(time.Now()),
DeletedAt: timestamp.FromTime(time.Now()), // Marked as deleted
}
err = mockKV.CAS(ctx, keyDeleted, func(_ any) (any, bool, error) {
return descDeleted, true, nil
})
require.NoError(t, err)

cfg := HATrackerConfig{
EnableHATracker: true,
KVStore: kv.Config{Mock: mockKV}, // Use the seeded KV
UpdateTimeout: time.Second,
UpdateTimeoutJitterMax: 0,
FailoverTimeout: time.Second,
}

tracker, err := NewHATracker(cfg, trackerLimits{maxReplicaGroups: 100}, haTrackerStatusConfig, prometheus.WrapRegistererWithPrefix("cortex_", reg), "test-ha-tracker", log.NewNopLogger())
require.NoError(t, err)

// Start ha tracker
require.NoError(t, services.StartAndAwaitRunning(ctx, tracker))
defer services.StopAndAwaitTerminated(ctx, tracker) // nolint:errcheck

tracker.electedLock.Lock()
// Check local cache updated
desc1Cached, ok := tracker.elected[key1]
require.True(t, ok)
require.Equal(t, desc1.Replica, desc1Cached.Replica)

_, ok = tracker.elected[keyDeleted]
require.False(t, ok)

desc2Cached, ok := tracker.elected[key2]
require.True(t, ok)
require.Equal(t, desc2.Replica, desc2Cached.Replica)

// user1 should have 1 group (clusterUser1), ignoring clusterDeleted
require.NotNil(t, tracker.replicaGroups[user1])
require.Equal(t, 1, len(tracker.replicaGroups[user1]))
_, hasClusterUser1 := tracker.replicaGroups[user1][clusterUser1]
require.True(t, hasClusterUser1)

// user2 should have 1 group (clusterUser2), ignoring clusterDeleted
require.NotNil(t, tracker.replicaGroups[user2])
require.Equal(t, 1, len(tracker.replicaGroups[user2]))
_, hasClusterUser2 := tracker.replicaGroups[user2][clusterUser2]
require.True(t, hasClusterUser2)

tracker.electedLock.Unlock()

// Check metric updated
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_ha_tracker_user_replica_group_count Number of HA replica groups tracked for each user.
# TYPE cortex_ha_tracker_user_replica_group_count gauge
cortex_ha_tracker_user_replica_group_count{user="user1"} 1
cortex_ha_tracker_user_replica_group_count{user="user2"} 1
`), "cortex_ha_tracker_user_replica_group_count",
))
}

func checkUserReplicaGroups(t *testing.T, duration time.Duration, c *HATracker, user string, expectedReplicaGroups int) {
t.Helper()
test.Poll(t, duration, nil, func() any {
Expand Down
Loading