From 93ccc9a3d9c4de54c86e7a4e279173165372d99f Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Sat, 16 May 2026 00:08:18 +0000 Subject: [PATCH] =?UTF-8?q?feat(ruler):=20rule=20group=20select=20merging?= =?UTF-8?q?=20=E2=80=94=20reduce=20redundant=20ingester=20queries?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a rule group has multiple rules querying the same metric with overlapping matchers, pre-fetch the broadest query once and serve individual rules from the cached result with local filtering. - Parse rules' PromQL expressions, group by metric + expression structure - Detect superset matchers (e.g., =~".*" covers "blue" and "green") - Pre-fetch broadest rule's expression once via QueryFunc - Serve subsequent rules from cached Vector with local matcher filtering - Zero vendored changes — uses context injection via GroupEvalIterationFunc Config: --ruler.select-merger-enabled, --ruler.select-merger-min-rules Benchmark: 114 → 38 queries per group evaluation (66% reduction) --- pkg/ruler/compat.go | 12 +- pkg/ruler/manager.go | 16 ++ pkg/ruler/prefetch_queryable.go | 162 +++++++++++++++++ pkg/ruler/prefetch_queryable_test.go | 169 ++++++++++++++++++ pkg/ruler/ruler.go | 7 + pkg/ruler/select_merger.go | 183 ++++++++++++++++++++ pkg/ruler/select_merger_bench_test.go | 105 +++++++++++ pkg/ruler/select_merger_integration_test.go | 63 +++++++ pkg/ruler/select_merger_test.go | 141 +++++++++++++++ 9 files changed, 856 insertions(+), 2 deletions(-) create mode 100644 pkg/ruler/prefetch_queryable.go create mode 100644 pkg/ruler/prefetch_queryable_test.go create mode 100644 pkg/ruler/select_merger.go create mode 100644 pkg/ruler/select_merger_bench_test.go create mode 100644 pkg/ruler/select_merger_integration_test.go create mode 100644 pkg/ruler/select_merger_test.go diff --git a/pkg/ruler/compat.go b/pkg/ruler/compat.go index ff9a5995b2b..10a546cca4c 100644 --- a/pkg/ruler/compat.go +++ b/pkg/ruler/compat.go @@ -471,11 +471,19 @@ func buildQueryFunc( failedQueries := metrics.FailedQueriesVec.WithLabelValues(userID) metricsFunc := metricsQueryFunc(baseQueryFunc, totalQueries, failedQueries) + var qf rules.QueryFunc // apply statistic middleware if cfg.EnableQueryStats { - return recordAndReportRuleQueryMetrics(metricsFunc, userID, metrics, logger) + qf = recordAndReportRuleQueryMetrics(metricsFunc, userID, metrics, logger) + } else { + qf = metricsFunc + } + + // apply select merger wrapper (checks context for prefetch cache) + if cfg.SelectMergerEnabled { + qf = selectMergerQueryFunc(qf) } - return metricsFunc + return qf } type QueryableError struct { diff --git a/pkg/ruler/manager.go b/pkg/ruler/manager.go index 86611201899..8ae16ce60e1 100644 --- a/pkg/ruler/manager.go +++ b/pkg/ruler/manager.go @@ -134,6 +134,9 @@ func NewDefaultMultiTenantManager(cfg Config, limits RulesLimits, managerFactory logger: logger, ruleGroupIterationFunc: defaultRuleGroupIterationFunc, } + if cfg.SelectMergerEnabled { + m.ruleGroupIterationFunc = mergedSelectIterationFunc(cfg.SelectMergerMinRules) + } if cfg.RulesBackupEnabled() { m.rulesBackupManager = newRulesBackupManager(cfg, logger, reg) } @@ -292,6 +295,19 @@ func defaultRuleGroupIterationFunc(ctx context.Context, g *promRules.Group, eval promRules.DefaultEvalIterationFunc(ctx, g, evalTimestamp) } +// mergedSelectIterationFunc returns a GroupEvalIterationFunc that pre-fetches +// merged selectors before evaluating the group, injecting the cache into +// context so the selectMergerQueryFunc wrapper can serve from it. +func mergedSelectIterationFunc(minRules int) promRules.GroupEvalIterationFunc { + return func(ctx context.Context, g *promRules.Group, evalTimestamp time.Time) { + plan := planMergedSelects(g.Rules(), minRules) + if len(plan) > 0 { + ctx = withSelectMergerPlan(ctx, plan) + } + defaultRuleGroupIterationFunc(ctx, g, evalTimestamp) + } +} + // newManager creates a prometheus rule manager wrapped with a user id // configured storage, appendable, notifier, and instrumentation func (r *DefaultMultiTenantManager) newManager(ctx context.Context, userID string) (RulesManager, error) { diff --git a/pkg/ruler/prefetch_queryable.go b/pkg/ruler/prefetch_queryable.go new file mode 100644 index 00000000000..33c836baf9b --- /dev/null +++ b/pkg/ruler/prefetch_queryable.go @@ -0,0 +1,162 @@ +package ruler + +import ( + "context" + "sync" + "time" + + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/promql/parser" + "github.com/prometheus/prometheus/rules" +) + +type selectMergerCtxKey struct{} + +// selectMergerState is injected into context by the iteration func. +// The QueryFunc wrapper lazily executes the prefetch on first access. +type selectMergerState struct { + plan []mergedSelect + once sync.Once + cache *prefetchCache +} + +func withSelectMergerPlan(ctx context.Context, plan []mergedSelect) context.Context { + return context.WithValue(ctx, selectMergerCtxKey{}, &selectMergerState{plan: plan}) +} + +// selectMergerQueryFunc wraps a QueryFunc to check context for a merge plan. +// On first call, it lazily pre-fetches using the inner QueryFunc, then serves from cache. +func selectMergerQueryFunc(inner rules.QueryFunc) rules.QueryFunc { + return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) { + state, _ := ctx.Value(selectMergerCtxKey{}).(*selectMergerState) + if state == nil { + return inner(ctx, qs, t) + } + + // Lazy prefetch: execute plan on first call. + state.once.Do(func() { + state.cache = executePrefetch(ctx, state.plan, inner, t) + }) + + if state.cache != nil { + selectors := extractSelectorsFromExpr(qs) + if len(selectors) == 1 { + if vec, ok := state.cache.get(selectors[0]); ok { + return vec, nil + } + } + } + return inner(ctx, qs, t) + } +} + +// prefetchEntry holds pre-fetched results for a merged selector. +type prefetchEntry struct { + matchers []*labels.Matcher + vector promql.Vector +} + +// prefetchCache holds all pre-fetched data for a single group evaluation. +type prefetchCache struct { + entries []prefetchEntry +} + +func (c *prefetchCache) get(queryMatchers []*labels.Matcher) (promql.Vector, bool) { + for _, e := range c.entries { + if isMatcherSetSuperset(e.matchers, queryMatchers) { + extra := extraMatchers(e.matchers, queryMatchers) + if len(extra) == 0 { + return e.vector, true + } + return filterVector(e.vector, extra), true + } + } + return nil, false +} + +func filterVector(vec promql.Vector, filters []*labels.Matcher) promql.Vector { + var result promql.Vector + for _, s := range vec { + if matchesAll(s.Metric, filters) { + result = append(result, s) + } + } + return result +} + +// executePrefetch runs the merged selectors via QueryFunc and populates a cache. +// Called without cache in context, so inner falls through to the real query. +func executePrefetch(ctx context.Context, plan []mergedSelect, queryFunc rules.QueryFunc, ts time.Time) *prefetchCache { + // Remove the state from context to prevent recursion during prefetch. + ctx = context.WithValue(ctx, selectMergerCtxKey{}, (*selectMergerState)(nil)) + cache := &prefetchCache{} + for _, ms := range plan { + vec, err := queryFunc(ctx, ms.prefetchExpr, ts) + if err != nil { + continue + } + cache.entries = append(cache.entries, prefetchEntry{ + matchers: ms.mergedMatchers, + vector: vec, + }) + } + return cache +} + +func isMatcherSetSuperset(superMatchers, subMatchers []*labels.Matcher) bool { + for _, sup := range superMatchers { + found := false + for _, sub := range subMatchers { + if sub.Name == sup.Name { + found = true + if !isMatcherSuperset(sup, sub) { + return false + } + break + } + } + if !found { + return false + } + } + return true +} + +func extraMatchers(entryMatchers, queryMatchers []*labels.Matcher) []*labels.Matcher { + var result []*labels.Matcher + for _, qm := range queryMatchers { + isExtra := true + for _, em := range entryMatchers { + if em.Name == qm.Name && em.Type == qm.Type && em.Value == qm.Value { + isExtra = false + break + } + } + if isExtra { + result = append(result, qm) + } + } + return result +} + +func matchesAll(lset labels.Labels, matchers []*labels.Matcher) bool { + for _, m := range matchers { + if !m.Matches(lset.Get(m.Name)) { + return false + } + } + return true +} + +func extractSelectorsFromExpr(qs string) [][]*labels.Matcher { + expr, err := parser.ParseExpr(qs) + if err != nil { + return nil + } + var result [][]*labels.Matcher + extractSelectors(expr, func(vs *parser.VectorSelector) { + result = append(result, vs.LabelMatchers) + }) + return result +} diff --git a/pkg/ruler/prefetch_queryable_test.go b/pkg/ruler/prefetch_queryable_test.go new file mode 100644 index 00000000000..613738932d1 --- /dev/null +++ b/pkg/ruler/prefetch_queryable_test.go @@ -0,0 +1,169 @@ +package ruler + +import ( + "context" + "testing" + "time" + + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/rules" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestPrefetchCache_FindSuperset(t *testing.T) { + cache := &prefetchCache{ + entries: []prefetchEntry{ + { + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "__name__", "http"), + labels.MustNewMatcher(labels.MatchRegexp, "job", ".*"), + }, + vector: promql.Vector{ + {Metric: labels.FromStrings("__name__", "http", "job", "api"), T: 1000, F: 1.0}, + {Metric: labels.FromStrings("__name__", "http", "job", "web"), T: 1000, F: 2.0}, + }, + }, + }, + } + + queryMatchers := []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "__name__", "http"), + labels.MustNewMatcher(labels.MatchEqual, "job", "api"), + } + + vec, ok := cache.get(queryMatchers) + require.True(t, ok) + assert.Len(t, vec, 1) + assert.Equal(t, "api", vec[0].Metric.Get("job")) + assert.Equal(t, 1.0, vec[0].F) +} + +func TestIsMatcherSetSuperset(t *testing.T) { + tests := []struct { + name string + super []*labels.Matcher + sub []*labels.Matcher + want bool + }{ + { + name: "identical sets", + super: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "job", "api")}, + sub: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "job", "api")}, + want: true, + }, + { + name: "regex superset of equal", + super: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "job", ".*")}, + sub: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "job", "api")}, + want: true, + }, + { + name: "super has extra label — more restrictive, not superset", + super: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "job", "api"), + labels.MustNewMatcher(labels.MatchEqual, "env", "prod"), + }, + sub: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "job", "api")}, + want: false, + }, + { + name: "sub has extra label — super is broader", + super: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "job", "api")}, + sub: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "job", "api"), + labels.MustNewMatcher(labels.MatchEqual, "env", "prod"), + }, + want: true, + }, + { + name: "different values — not superset", + super: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "job", "web")}, + sub: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "job", "api")}, + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.want, isMatcherSetSuperset(tt.super, tt.sub)) + }) + } +} + +func TestSelectMergerQueryFunc(t *testing.T) { + allSeries := promql.Vector{ + {Metric: labels.FromStrings("__name__", "http_requests", "job", "api"), T: 1000, F: 10.0}, + {Metric: labels.FromStrings("__name__", "http_requests", "job", "web"), T: 1000, F: 20.0}, + } + + innerCalled := 0 + inner := func(_ context.Context, qs string, _ time.Time) (promql.Vector, error) { + innerCalled++ + return allSeries, nil + } + + qf := selectMergerQueryFunc(inner) + + // Without plan in context — falls through to inner. + vec, err := qf(context.Background(), `http_requests{job="api"}`, time.Unix(1, 0)) + require.NoError(t, err) + assert.Equal(t, 1, innerCalled) + assert.Len(t, vec, 2) // inner returns all + + // With plan in context — lazy prefetch then serve from cache. + innerCalled = 0 + plan := []mergedSelect{ + { + metricName: "http_requests", + mergedMatchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "__name__", "http_requests"), + labels.MustNewMatcher(labels.MatchRegexp, "job", ".*"), + }, + prefetchExpr: `http_requests{job=~".*"}`, + }, + } + ctx := withSelectMergerPlan(context.Background(), plan) + + // First call triggers prefetch (1 inner call), then serves filtered result. + vec, err = qf(ctx, `http_requests{job="api"}`, time.Unix(1, 0)) + require.NoError(t, err) + assert.Equal(t, 1, innerCalled) // prefetch call + assert.Len(t, vec, 1) + assert.Equal(t, "api", vec[0].Metric.Get("job")) + + // Second call — served from cache, no additional inner call. + vec, err = qf(ctx, `http_requests{job="web"}`, time.Unix(1, 0)) + require.NoError(t, err) + assert.Equal(t, 1, innerCalled) // still 1 + assert.Len(t, vec, 1) + assert.Equal(t, "web", vec[0].Metric.Get("job")) + + // Query not in cache — falls through to inner. + vec, err = qf(ctx, `other_metric{job="api"}`, time.Unix(1, 0)) + require.NoError(t, err) + assert.Equal(t, 2, innerCalled) +} + +func TestExecutePrefetch(t *testing.T) { + queryFunc := func(_ context.Context, _ string, ts time.Time) (promql.Vector, error) { + return promql.Vector{ + {Metric: labels.FromStrings("__name__", "http", "job", "api"), T: ts.UnixMilli(), F: 5.0}, + }, nil + } + + plan := []mergedSelect{ + { + metricName: "http", + mergedMatchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "__name__", "http"), + }, + prefetchExpr: `http{job=~".*"}`, + }, + } + + cache := executePrefetch(context.Background(), plan, rules.QueryFunc(queryFunc), time.Unix(1, 0)) + require.Len(t, cache.entries, 1) + assert.Len(t, cache.entries[0].vector, 1) +} diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index 82f7c57fb0d..c2e15589844 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -182,6 +182,9 @@ type Config struct { ThanosEngine engine.ThanosEngineConfig `yaml:"thanos_engine"` + SelectMergerEnabled bool `yaml:"select_merger_enabled"` + SelectMergerMinRules int `yaml:"select_merger_min_rules"` + // NameValidationScheme is the scheme for validating metric and label names (set from root config). NameValidationScheme model.ValidationScheme `yaml:"-"` } @@ -273,6 +276,10 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.EnableHAEvaluation, "ruler.enable-ha-evaluation", false, "Enable high availability") f.DurationVar(&cfg.LivenessCheckTimeout, "ruler.liveness-check-timeout", 1*time.Second, "Timeout duration for non-primary rulers during liveness checks. If the check times out, the non-primary ruler will evaluate the rule group. Applicable when ruler.enable-ha-evaluation is true.") + + f.BoolVar(&cfg.SelectMergerEnabled, "ruler.select-merger-enabled", false, "Enable merged select pre-fetching to reduce redundant queries for rules sharing the same metric.") + f.IntVar(&cfg.SelectMergerMinRules, "ruler.select-merger-min-rules", 3, "Minimum number of rules querying the same metric to trigger merged pre-fetching.") + cfg.RingCheckPeriod = 5 * time.Second } diff --git a/pkg/ruler/select_merger.go b/pkg/ruler/select_merger.go new file mode 100644 index 00000000000..450d352fc47 --- /dev/null +++ b/pkg/ruler/select_merger.go @@ -0,0 +1,183 @@ +package ruler + +import ( + "strings" + + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/promql/parser" + "github.com/prometheus/prometheus/rules" +) + +type mergedSelect struct { + metricName string + mergedMatchers []*labels.Matcher + originalEntries [][]*labels.Matcher // per-rule matchers + prefetchExpr string // expression to evaluate for pre-fetch +} + +func planMergedSelects(rls []rules.Rule, minRules int) []mergedSelect { + // Group selectors by metric name AND expression structure. + // The "structure key" is the expression with matchers blanked out, + // so rate(m{a="1"}[5m]) and rate(m{a="2"}[5m]) share a key but + // sum(m{a="1"}) does not. + type entry struct { + matchers []*labels.Matcher + exprStr string + } + // Key: metricName + "\x00" + expression structure + groups := map[string][]entry{} + + for _, r := range rls { + exprStr := r.Query().String() + extractSelectors(r.Query(), func(vs *parser.VectorSelector) { + name := metricNameFromMatchers(vs.LabelMatchers) + if name == "" { + return + } + key := name + "\x00" + exprStructureKey(r.Query(), vs) + groups[key] = append(groups[key], entry{matchers: vs.LabelMatchers, exprStr: exprStr}) + }) + } + + var result []mergedSelect + for _, entries := range groups { + if len(entries) < minRules { + continue + } + originals := make([][]*labels.Matcher, len(entries)) + for i, e := range entries { + originals[i] = e.matchers + } + merged := computeMergedMatchers(originals) + + // Find the expression whose matchers equal the merged set (the broadest rule). + // If none found, skip — we can't safely pre-fetch without a full expression. + prefetchExpr := "" + for _, e := range entries { + if matchersEqual(e.matchers, merged) { + prefetchExpr = e.exprStr + break + } + } + if prefetchExpr == "" { + continue + } + + name := metricNameFromMatchers(merged) + result = append(result, mergedSelect{ + metricName: name, + mergedMatchers: merged, + originalEntries: originals, + prefetchExpr: prefetchExpr, + }) + } + return result +} + +// exprStructureKey returns a string representing the expression structure +// with the VectorSelector's matchers replaced by a placeholder. +// Two expressions with the same structure key differ only in their matchers. +func exprStructureKey(expr parser.Expr, vs *parser.VectorSelector) string { + full := expr.String() + selectorStr := vs.String() + // Replace the specific selector with a placeholder. + return strings.Replace(full, selectorStr, "{__PLACEHOLDER__}", 1) +} + +// matchersEqual returns true if two matcher slices contain the same matchers (order-independent). +func matchersEqual(a, b []*labels.Matcher) bool { + if len(a) != len(b) { + return false + } + for _, am := range a { + found := false + for _, bm := range b { + if am.Name == bm.Name && am.Type == bm.Type && am.Value == bm.Value { + found = true + break + } + } + if !found { + return false + } + } + return true +} + +func extractSelectors(expr parser.Expr, fn func(*parser.VectorSelector)) { + parser.Inspect(expr, func(node parser.Node, _ []parser.Node) error { + if vs, ok := node.(*parser.VectorSelector); ok { + fn(vs) + } + return nil + }) +} + +func metricNameFromMatchers(ms []*labels.Matcher) string { + for _, m := range ms { + if m.Name == labels.MetricName && m.Type == labels.MatchEqual { + return m.Value + } + } + return "" +} + +func computeMergedMatchers(entries [][]*labels.Matcher) []*labels.Matcher { + // Collect all label names across entries, but only keep labels + // that appear in ALL entries. A label missing from any entry means + // that entry matches all values for that label — including it in the + // merged set would make the pre-fetch too restrictive. + labelMatchers := map[string][]*labels.Matcher{} + for _, ms := range entries { + for _, m := range ms { + labelMatchers[m.Name] = append(labelMatchers[m.Name], m) + } + } + + var result []*labels.Matcher + for _, ms := range labelMatchers { + // Only include if present in ALL entries. + if len(ms) != len(entries) { + continue + } + if sup := findSuperset(ms); sup != nil { + result = append(result, sup) + } + } + return result +} + +func findSuperset(ms []*labels.Matcher) *labels.Matcher { + for _, candidate := range ms { + coversAll := true + for _, other := range ms { + if !isMatcherSuperset(candidate, other) { + coversAll = false + break + } + } + if coversAll { + return candidate + } + } + return nil +} + +func isMatcherSuperset(a, b *labels.Matcher) bool { + if a.Name != b.Name { + return false + } + // =~".*" covers everything. + if a.Type == labels.MatchRegexp && a.Value == ".*" { + return true + } + // =~".+" covers any non-empty value. + if a.Type == labels.MatchRegexp && a.Value == ".+" { + if b.Type == labels.MatchEqual && b.Value == "" { + return false + } + return true + } + // Same type and value covers itself. + return a.Type == b.Type && a.Value == b.Value +} diff --git a/pkg/ruler/select_merger_bench_test.go b/pkg/ruler/select_merger_bench_test.go new file mode 100644 index 00000000000..b203dacb410 --- /dev/null +++ b/pkg/ruler/select_merger_bench_test.go @@ -0,0 +1,105 @@ +package ruler + +import ( + "context" + "fmt" + "sync/atomic" + "testing" + "time" + + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/promql/parser" + "github.com/prometheus/prometheus/rules" +) + +// BenchmarkSelectMerger simulates 114 rules (38 metrics × 3 deploy_color variants) +// and compares query count with vs without merging. +func BenchmarkSelectMerger(b *testing.B) { + const ( + numMetrics = 38 + numColors = 3 + seriesPer = 50 + ) + colors := []string{"blue", "green", "canary"} + + // Build 114 rules: 38 metrics × 3 deploy_color variants. + // One variant uses =~".*" (superset) to enable merging. + rls := make([]rules.Rule, 0, numMetrics*numColors) + for i := range numMetrics { + for ci, c := range colors { + var expr string + if ci == numColors-1 { + // Last color variant uses regex superset — enables merging. + expr = fmt.Sprintf(`sum(metric_%d{deploy_color=~".*",job="svc"})`, i) + } else { + expr = fmt.Sprintf(`sum(metric_%d{deploy_color="%s",job="svc"})`, i, c) + } + _ = c + e, err := parser.ParseExpr(expr) + if err != nil { + b.Fatal(err) + } + rls = append(rls, &fakeRule{expr: e}) + } + } + + // Mock QueryFunc: returns seriesPer samples with varying deploy_color. + mockQueryFunc := func(_ context.Context, qs string, _ time.Time) (promql.Vector, error) { + vec := make(promql.Vector, seriesPer) + for i := range vec { + vec[i] = promql.Sample{ + Metric: labels.FromStrings( + "__name__", "metric_0", + "deploy_color", colors[i%numColors], + "job", "svc", + "instance", fmt.Sprintf("host-%d", i), + ), + T: 0, + F: float64(i), + } + } + return vec, nil + } + + ts := time.Now() + ctx := context.Background() + + b.Run("without_merging", func(b *testing.B) { + var calls atomic.Int64 + qf := func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) { + calls.Add(1) + return mockQueryFunc(ctx, qs, t) + } + for b.Loop() { + calls.Store(0) + for _, r := range rls { + _, err := qf(ctx, r.Query().String(), ts) + if err != nil { + b.Fatal(err) + } + } + } + b.ReportMetric(float64(calls.Load()), "queries/op") + }) + + b.Run("with_merging", func(b *testing.B) { + plan := planMergedSelects(rls, 2) + var calls atomic.Int64 + qf := selectMergerQueryFunc(func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) { + calls.Add(1) + return mockQueryFunc(ctx, qs, t) + }) + for b.Loop() { + calls.Store(0) + evalCtx := withSelectMergerPlan(ctx, plan) + for _, r := range rls { + _, err := qf(evalCtx, r.Query().String(), ts) + if err != nil { + b.Fatal(err) + } + } + } + b.ReportMetric(float64(calls.Load()), "queries/op") + }) +} diff --git a/pkg/ruler/select_merger_integration_test.go b/pkg/ruler/select_merger_integration_test.go new file mode 100644 index 00000000000..aa4685ef637 --- /dev/null +++ b/pkg/ruler/select_merger_integration_test.go @@ -0,0 +1,63 @@ +package ruler + +import ( + "context" + "sync/atomic" + "testing" + "time" + + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/rules" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestSelectMerger_Integration(t *testing.T) { + rls := []rules.Rule{ + mustParseRule(t, `rate(http_requests_total{job="api",deploy_color="blue"}[5m])`), + mustParseRule(t, `rate(http_requests_total{job="api",deploy_color="green"}[5m])`), + mustParseRule(t, `rate(http_requests_total{job="api",deploy_color=~".*"}[5m])`), + } + + // Step 1: Plan. + plan := planMergedSelects(rls, 2) + require.Len(t, plan, 1) + assert.Equal(t, "http_requests_total", plan[0].metricName) + + allSeries := promql.Vector{ + {Metric: labels.FromStrings("__name__", "http_requests_total", "job", "api", "deploy_color", "blue"), T: 1000, F: 10}, + {Metric: labels.FromStrings("__name__", "http_requests_total", "job", "api", "deploy_color", "green"), T: 1000, F: 20}, + {Metric: labels.FromStrings("__name__", "http_requests_total", "job", "api", "deploy_color", "red"), T: 1000, F: 30}, + } + + var callCount atomic.Int32 + inner := func(_ context.Context, _ string, _ time.Time) (promql.Vector, error) { + callCount.Add(1) + return allSeries, nil + } + + // Step 2: Create the context-aware QueryFunc and inject plan. + qf := selectMergerQueryFunc(inner) + ctx := withSelectMergerPlan(context.Background(), plan) + ts := time.Now() + + // Step 3: First call triggers lazy prefetch (1 inner call). + vecBlue, err := qf(ctx, `http_requests_total{job="api",deploy_color="blue"}`, ts) + require.NoError(t, err) + assert.Equal(t, int32(1), callCount.Load(), "prefetch should call inner once") + assert.Len(t, vecBlue, 1) + assert.Equal(t, "blue", vecBlue[0].Metric.Get("deploy_color")) + + // Step 4: Subsequent calls served from cache. + vecGreen, err := qf(ctx, `http_requests_total{job="api",deploy_color="green"}`, ts) + require.NoError(t, err) + assert.Equal(t, int32(1), callCount.Load(), "should not call inner again") + assert.Len(t, vecGreen, 1) + assert.Equal(t, "green", vecGreen[0].Metric.Get("deploy_color")) + + vecAll, err := qf(ctx, `http_requests_total{job="api",deploy_color=~".*"}`, ts) + require.NoError(t, err) + assert.Equal(t, int32(1), callCount.Load()) + assert.Len(t, vecAll, 3) +} diff --git a/pkg/ruler/select_merger_test.go b/pkg/ruler/select_merger_test.go new file mode 100644 index 00000000000..903ac2a585d --- /dev/null +++ b/pkg/ruler/select_merger_test.go @@ -0,0 +1,141 @@ +package ruler + +import ( + "testing" + + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/promql/parser" + "github.com/prometheus/prometheus/rules" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type fakeRule struct { + rules.Rule + expr parser.Expr +} + +func (f *fakeRule) Query() parser.Expr { return f.expr } + +func mustParseRule(t *testing.T, expr string) rules.Rule { + t.Helper() + e, err := parser.ParseExpr(expr) + require.NoError(t, err) + return &fakeRule{expr: e} +} + +func TestSelectMerger_Plan_GroupsByMetric(t *testing.T) { + rls := []rules.Rule{ + // http_requests_total: 3 rules with same structure, deploy_color varies, one has =~".*" superset + mustParseRule(t, `sum(http_requests_total{job="api",deploy_color="blue"})`), + mustParseRule(t, `sum(http_requests_total{job="api",deploy_color="green"})`), + mustParseRule(t, `sum(http_requests_total{job="api",deploy_color=~".*"})`), + // cpu_usage: 3 rules with same structure, host varies, one has =~".*" superset + mustParseRule(t, `avg(cpu_usage{host="a"})`), + mustParseRule(t, `avg(cpu_usage{host="b"})`), + mustParseRule(t, `avg(cpu_usage{host=~".*"})`), + } + + result := planMergedSelects(rls, 2) + + // Should produce 2 merged selects: one for http_requests_total, one for cpu_usage + assert.Len(t, result, 2) + + metricNames := map[string]bool{} + for _, ms := range result { + metricNames[ms.metricName] = true + } + assert.True(t, metricNames["http_requests_total"]) + assert.True(t, metricNames["cpu_usage"]) +} + +func TestSelectMerger_SupersetDetection(t *testing.T) { + tests := []struct { + name string + a, b *labels.Matcher + aCoversB bool + }{ + { + name: "regex .* covers equality", + a: labels.MustNewMatcher(labels.MatchRegexp, "job", ".*"), + b: labels.MustNewMatcher(labels.MatchEqual, "job", "blue"), + aCoversB: true, + }, + { + name: "regex .+ covers non-empty equality", + a: labels.MustNewMatcher(labels.MatchRegexp, "job", ".+"), + b: labels.MustNewMatcher(labels.MatchEqual, "job", "blue"), + aCoversB: true, + }, + { + name: "regex .+ does not cover empty", + a: labels.MustNewMatcher(labels.MatchRegexp, "job", ".+"), + b: labels.MustNewMatcher(labels.MatchEqual, "job", ""), + aCoversB: false, + }, + { + name: "same equality covers itself", + a: labels.MustNewMatcher(labels.MatchEqual, "job", "api"), + b: labels.MustNewMatcher(labels.MatchEqual, "job", "api"), + aCoversB: true, + }, + { + name: "different equality does not cover", + a: labels.MustNewMatcher(labels.MatchEqual, "job", "api"), + b: labels.MustNewMatcher(labels.MatchEqual, "job", "web"), + aCoversB: false, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + assert.Equal(t, tc.aCoversB, isMatcherSuperset(tc.a, tc.b)) + }) + } +} + +func TestSelectMerger_MinRulesThreshold(t *testing.T) { + rls := []rules.Rule{ + mustParseRule(t, `sum(http_requests_total{job="api",color="blue"})`), + mustParseRule(t, `sum(http_requests_total{job="api",color="green"})`), + mustParseRule(t, `sum(http_requests_total{job="api",color=~".*"})`), + mustParseRule(t, `avg(cpu_usage{host="a"})`), + } + + // minRules=2: http_requests_total has 3 rules with superset, cpu_usage has only 1 + result := planMergedSelects(rls, 2) + require.Len(t, result, 1) + assert.Equal(t, "http_requests_total", result[0].metricName) + + // minRules=4: nothing qualifies + result = planMergedSelects(rls, 4) + assert.Len(t, result, 0) +} + +func TestSelectMerger_DifferentExprStructures_NotMerged(t *testing.T) { + // Same metric but different expression structures should NOT be merged. + rls := []rules.Rule{ + mustParseRule(t, `rate(http_requests_total{job="api",color="blue"}[5m])`), + mustParseRule(t, `rate(http_requests_total{job="api",color=~".*"}[5m])`), + mustParseRule(t, `sum(http_requests_total{job="api",color="red"})`), + } + + result := planMergedSelects(rls, 2) + // Only the rate() pair should merge (2 rules with same structure). + // The sum() rule has a different structure. + require.Len(t, result, 1) + assert.Equal(t, "http_requests_total", result[0].metricName) + assert.Contains(t, result[0].prefetchExpr, "rate") +} + +func TestSelectMerger_NoSuperset_NotMerged(t *testing.T) { + // When no rule has a superset matcher, merging is skipped. + rls := []rules.Rule{ + mustParseRule(t, `sum(http_requests_total{job="api"})`), + mustParseRule(t, `sum(http_requests_total{job="web"})`), + } + + result := planMergedSelects(rls, 2) + // job="api" and job="web" have no superset, so no prefetchExpr can be found. + assert.Len(t, result, 0) +}