diff --git a/client/go.mod b/client/go.mod index 205e08decc3..9bc83b7043f 100644 --- a/client/go.mod +++ b/client/go.mod @@ -26,6 +26,7 @@ require ( github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/golang/protobuf v1.5.4 // indirect + github.com/kylelemons/godebug v1.1.0 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/client/go.sum b/client/go.sum index c9c66d687db..36f603d7aa9 100644 --- a/client/go.sum +++ b/client/go.sum @@ -38,6 +38,8 @@ github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3x github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= diff --git a/client/resource_group/controller/group_controller.go b/client/resource_group/controller/group_controller.go index 4fca2091c00..d0bc2601171 100644 --- a/client/resource_group/controller/group_controller.go +++ b/client/resource_group/controller/group_controller.go @@ -106,6 +106,21 @@ type groupMetricsCollection struct { tokenRequestCounter prometheus.Counter runningKVRequestCounter prometheus.Gauge consumeTokenHistogram prometheus.Observer + + // Paging pre-charge observability: cached per-RG so the hot path avoids + // WithLabelValues on every KV request. Only pre-charged requests (those + // with a PredictedReadBytes hint > 0) contribute to these metrics. + prechargeCounter prometheus.Counter + prechargeBytesCounter prometheus.Counter + actualBytesCounter prometheus.Counter + predictionResidualBytes prometheus.Observer + + // Nonprecharge bucket: RPCs that implemented the predicted-bytes + // interface but reported 0 (EMA cold-start or feature-disabled) and + // therefore skipped pre-charge entirely. Recorded at settle time + // (AfterKVRequest). + nonprechargeCounter prometheus.Counter + nonprechargeActualBytes prometheus.Counter } func initMetrics(oldName, name string) *groupMetricsCollection { @@ -122,9 +137,42 @@ func initMetrics(oldName, name string) *groupMetricsCollection { tokenRequestCounter: metrics.ResourceGroupTokenRequestCounter.WithLabelValues(oldName, name), runningKVRequestCounter: metrics.GroupRunningKVRequestCounter.WithLabelValues(name), consumeTokenHistogram: metrics.TokenConsumedHistogram.WithLabelValues(name), + + prechargeCounter: metrics.PagingPrechargeCounter.WithLabelValues(name), + prechargeBytesCounter: metrics.PagingPrechargeBytesCounter.WithLabelValues(name), + actualBytesCounter: metrics.PagingActualBytesCounter.WithLabelValues(name), + predictionResidualBytes: metrics.PagingPredictionResidualBytes.WithLabelValues(name), + + nonprechargeCounter: metrics.PagingNonprechargeCounter.WithLabelValues(name), + nonprechargeActualBytes: metrics.PagingNonprechargeActualBytes.WithLabelValues(name), } } +// observePagingPrecharge records one pre-charge event. Caller must guarantee +// bytesForEst > 0; cold-start requests with no hint are not pre-charged and +// should not be observed here. +func (gmc *groupMetricsCollection) observePagingPrecharge(bytesForEst uint64) { + gmc.prechargeCounter.Inc() + gmc.prechargeBytesCounter.Add(float64(bytesForEst)) +} + +// observePagingActual records actual read bytes and the signed residual for a +// pre-charged request. Caller must guarantee predicted > 0. +func (gmc *groupMetricsCollection) observePagingActual(predicted, actual uint64) { + gmc.actualBytesCounter.Add(float64(actual)) + gmc.predictionResidualBytes.Observe(float64(actual) - float64(predicted)) +} + +// observePagingNonprecharge records one RPC that implemented the predicted +// read-bytes interface but reported 0 (EMA cold or feature disabled) and +// therefore ran without pre-charge. `actual` is the response's read bytes, +// settled in AfterKVRequest. Paired with observePagingPrecharge this gives +// the cold/ready split and the byte volume that bypassed throttling. +func (gmc *groupMetricsCollection) observePagingNonprecharge(actual uint64) { + gmc.nonprechargeCounter.Inc() + gmc.nonprechargeActualBytes.Add(float64(actual)) +} + type tokenCounter struct { fillRate uint64 @@ -551,6 +599,9 @@ func (gc *groupCostController) onRequestWaitImpl( for _, calc := range gc.calculators { calc.BeforeKVRequest(delta, info) } + if bytesForEst := estimatedReadBytes(info); bytesForEst > 0 { + gc.metrics.observePagingPrecharge(bytesForEst) + } gc.mu.Lock() add(gc.mu.consumption, delta) @@ -601,10 +652,19 @@ func (gc *groupCostController) onResponseImpl( for _, calc := range gc.calculators { calc.AfterKVRequest(delta, req, resp) } + if bytesForEst := estimatedReadBytes(req); bytesForEst > 0 { + gc.metrics.observePagingActual(bytesForEst, resp.ReadBytes()) + } else if !req.IsWrite() { + if _, ok := req.(predictedReadBytesProvider); ok { + gc.metrics.observePagingNonprecharge(resp.ReadBytes()) + } + } if !gc.burstable.Load() { counter := gc.run.requestUnitTokens if v := getRUValueFromConsumption(delta); v > 0 { counter.limiter.RemoveTokens(time.Now(), v) + } else if v < 0 { + counter.limiter.RefundTokens(time.Now(), -v) } } @@ -632,21 +692,33 @@ func (gc *groupCostController) onResponseWaitImpl( for _, calc := range gc.calculators { calc.AfterKVRequest(delta, req, resp) } + if bytesForEst := estimatedReadBytes(req); bytesForEst > 0 { + gc.metrics.observePagingActual(bytesForEst, resp.ReadBytes()) + } else if !req.IsWrite() { + if _, ok := req.(predictedReadBytesProvider); ok { + gc.metrics.observePagingNonprecharge(resp.ReadBytes()) + } + } var waitDuration time.Duration if !gc.burstable.Load() { - allowDebt := delta.ReadBytes+delta.WriteBytes < bigRequestThreshold || !gc.isThrottled.Load() - d, err := gc.acquireTokens(ctx, delta, &waitDuration, allowDebt) - if err != nil { - if errs.ErrClientResourceGroupThrottled.Equal(err) { - gc.metrics.failedRequestCounterWithThrottled.Inc() - gc.metrics.failedLimitReserveDuration.Observe(d.Seconds()) - } else { - gc.metrics.failedRequestCounterWithOthers.Inc() + v := getRUValueFromConsumption(delta) + if v > 0 { + allowDebt := delta.ReadBytes+delta.WriteBytes < bigRequestThreshold || !gc.isThrottled.Load() + d, err := gc.acquireTokens(ctx, delta, &waitDuration, allowDebt) + if err != nil { + if errs.ErrClientResourceGroupThrottled.Equal(err) { + gc.metrics.failedRequestCounterWithThrottled.Inc() + gc.metrics.failedLimitReserveDuration.Observe(d.Seconds()) + } else { + gc.metrics.failedRequestCounterWithOthers.Inc() + } + return nil, waitDuration, err } - return nil, waitDuration, err + gc.metrics.successfulRequestDuration.Observe(d.Seconds()) + waitDuration += d + } else if v < 0 { + gc.run.requestUnitTokens.limiter.RefundTokens(time.Now(), -v) } - gc.metrics.successfulRequestDuration.Observe(d.Seconds()) - waitDuration += d } gc.mu.Lock() diff --git a/client/resource_group/controller/group_controller_test.go b/client/resource_group/controller/group_controller_test.go index f916f5e56c4..a1a3622a1d3 100644 --- a/client/resource_group/controller/group_controller_test.go +++ b/client/resource_group/controller/group_controller_test.go @@ -20,11 +20,13 @@ import ( "testing" "time" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/require" rmpb "github.com/pingcap/kvproto/pkg/resource_manager" "github.com/tikv/pd/client/errs" + "github.com/tikv/pd/client/resource_group/controller/metrics" ) func createTestGroupCostController(re *require.Assertions) *groupCostController { @@ -208,6 +210,312 @@ func TestOnResponseWaitConsumption(t *testing.T) { verify() } +func TestPredictedReadBytesPreCharge(t *testing.T) { + re := require.New(t) + cfg := DefaultRUConfig() + kvCalc := newKVCalculator(cfg) + + // BeforeKVRequest with a PredictedReadBytes hint should pre-charge + // baseCost + predictedReadBytes * ReadBytesCost, regardless of + // whether PagingSizeBytes is set. The two are decoupled. + pagingSizeBytes := uint64(4 * 1024 * 1024) // protocol-level cap only + predictedReadBytes := uint64(256 * 1024) // learned EMA estimate + req := &TestRequestInfo{ + isWrite: false, + pagingSizeBytes: pagingSizeBytes, + predictedReadBytes: predictedReadBytes, + } + precharge := &rmpb.Consumption{} + kvCalc.BeforeKVRequest(precharge, req) + + baseCost := float64(cfg.ReadBaseCost) + float64(cfg.ReadPerBatchBaseCost)*defaultAvgBatchProportion + hintCost := float64(cfg.ReadBytesCost) * float64(predictedReadBytes) + re.InDelta(baseCost+hintCost, precharge.RRU, 1e-6, + "BeforeKVRequest should pre-charge based on PredictedReadBytes") + + // AfterKVRequest should subtract the same hint basis, preserving + // precharge + settle == baseCost + actualCost. + actualReadBytes := uint64(300 * 1024) // close to the prediction + resp := &TestResponseInfo{ + readBytes: actualReadBytes, + kvCPU: 10 * time.Millisecond, + succeed: true, + } + settle := &rmpb.Consumption{} + kvCalc.AfterKVRequest(settle, req, resp) + + actualReadCost := float64(cfg.ReadBytesCost) * float64(actualReadBytes) + cpuCost := float64(cfg.CPUMsCost) * 10.0 + re.InDelta(actualReadCost+cpuCost-hintCost, settle.RRU, 1e-6, + "AfterKVRequest should settle using the same hint basis as BeforeKVRequest") + + totalRRU := precharge.RRU + settle.RRU + re.InDelta(baseCost+actualReadCost+cpuCost, totalRRU, 1e-6, + "Total RRU across precharge+settle should equal baseCost + actualCost") +} + +func TestNoPreChargeWithoutPredictedReadBytes(t *testing.T) { + re := require.New(t) + cfg := DefaultRUConfig() + kvCalc := newKVCalculator(cfg) + baseCost := float64(cfg.ReadBaseCost) + float64(cfg.ReadPerBatchBaseCost)*defaultAvgBatchProportion + + // With PagingSizeBytes but no PredictedReadBytes hint, BeforeKVRequest + // must NOT pre-charge: the two quantities are decoupled. AfterKVRequest + // bills the actual read bytes only. + pagingSizeBytes := uint64(4 * 1024 * 1024) + reqPagingOnly := &TestRequestInfo{ + isWrite: false, + pagingSizeBytes: pagingSizeBytes, + } + precharge := &rmpb.Consumption{} + kvCalc.BeforeKVRequest(precharge, reqPagingOnly) + re.InDelta(baseCost, precharge.RRU, 1e-6, + "PagingSizeBytes alone must not trigger pre-charge") + + actualReadBytes := uint64(2 * 1024 * 1024) + resp := &TestResponseInfo{ + readBytes: actualReadBytes, + kvCPU: 10 * time.Millisecond, + succeed: true, + } + settle := &rmpb.Consumption{} + kvCalc.AfterKVRequest(settle, reqPagingOnly, resp) + + actualReadCost := float64(cfg.ReadBytesCost) * float64(actualReadBytes) + cpuCost := float64(cfg.CPUMsCost) * 10.0 + re.InDelta(actualReadCost+cpuCost, settle.RRU, 1e-6, + "AfterKVRequest should bill actual read cost only when nothing was pre-charged") + + // Bare request without any hint or paging also pre-charges nothing. + reqNone := &TestRequestInfo{isWrite: false} + prechargeNone := &rmpb.Consumption{} + kvCalc.BeforeKVRequest(prechargeNone, reqNone) + re.InDelta(baseCost, prechargeNone.RRU, 1e-6, + "Without hint or paging, BeforeKVRequest should only charge baseCost") +} + +func TestPagingPreChargeTokenRefund(t *testing.T) { + re := require.New(t) + gc := createTestGroupCostController(re) + + // Give the limiter a known amount of tokens with no fill rate for precise measurement. + initialTokens := float64(100000) + gc.run.requestUnitTokens.limiter.Reconfigure(time.Now(), tokenBucketReconfigureArgs{ + newTokens: initialTokens, + newFillRate: 0, + newBurst: 0, + }) + + predictedReadBytes := uint64(4 * 1024 * 1024) // 4 MB pre-charge + actualReadBytes := uint64(1 * 1024 * 1024) // 1 MB actual + + req := &TestRequestInfo{ + isWrite: false, + predictedReadBytes: predictedReadBytes, + } + resp := &TestResponseInfo{ + readBytes: actualReadBytes, + succeed: true, + } + + // Pre-charge reserves tokens from the limiter. + _, _, _, _, err := gc.onRequestWaitImpl(context.TODO(), req) + re.NoError(err) + tokensAfterPreCharge := gc.run.requestUnitTokens.limiter.AvailableTokens(time.Now()) + + // Response settlement should refund excess tokens. + _, _, err = gc.onResponseWaitImpl(context.TODO(), req, resp) + re.NoError(err) + tokensAfterSettlement := gc.run.requestUnitTokens.limiter.AvailableTokens(time.Now()) + + // The limiter should have more tokens after settlement than after pre-charge, + // because the refund (pre-charge - actual) exceeds the actual read cost. + cfg := DefaultRUConfig() + preChargeCost := float64(cfg.ReadBytesCost) * float64(predictedReadBytes) + actualCost := float64(cfg.ReadBytesCost) * float64(actualReadBytes) + expectedRefund := preChargeCost - actualCost + re.Positive(expectedRefund, "sanity: pre-charge should exceed actual cost") + + re.InDelta(tokensAfterPreCharge+expectedRefund, tokensAfterSettlement, 1.0, + "limiter should be refunded the excess pre-charged tokens") + + // Verify net consumption is correct: baseCost + actualReadCost. + gc.mu.Lock() + netRRU := gc.mu.consumption.RRU + gc.mu.Unlock() + baseCost := float64(cfg.ReadBaseCost) + float64(cfg.ReadPerBatchBaseCost)*defaultAvgBatchProportion + expectedNetRRU := baseCost + actualCost + re.InDelta(expectedNetRRU, netRRU, 1e-6, + "net consumption should equal baseCost + actualReadCost") +} + +func TestPagingPreChargeNoRefundWhenActualExceedsEstimate(t *testing.T) { + re := require.New(t) + gc := createTestGroupCostController(re) + + initialTokens := float64(100000) + gc.run.requestUnitTokens.limiter.Reconfigure(time.Now(), tokenBucketReconfigureArgs{ + newTokens: initialTokens, + newFillRate: 0, + newBurst: 0, + }) + + predictedReadBytes := uint64(1 * 1024 * 1024) // 1 MB pre-charge + actualReadBytes := uint64(4 * 1024 * 1024) // 4 MB actual (exceeds estimate) + + req := &TestRequestInfo{ + isWrite: false, + predictedReadBytes: predictedReadBytes, + } + resp := &TestResponseInfo{ + readBytes: actualReadBytes, + succeed: true, + } + + _, _, _, _, err := gc.onRequestWaitImpl(context.TODO(), req) + re.NoError(err) + tokensAfterPreCharge := gc.run.requestUnitTokens.limiter.AvailableTokens(time.Now()) + + _, _, err = gc.onResponseWaitImpl(context.TODO(), req, resp) + re.NoError(err) + tokensAfterSettlement := gc.run.requestUnitTokens.limiter.AvailableTokens(time.Now()) + + // Actual exceeds pre-charge, so settlement should consume more tokens (not refund). + re.Less(tokensAfterSettlement, tokensAfterPreCharge, + "when actual exceeds pre-charge, settlement should consume tokens") +} + +func TestOnResponseImplPagingRefund(t *testing.T) { + re := require.New(t) + gc := createTestGroupCostController(re) + + initialTokens := float64(100000) + gc.run.requestUnitTokens.limiter.Reconfigure(time.Now(), tokenBucketReconfigureArgs{ + newTokens: initialTokens, + newFillRate: 0, + newBurst: 0, + }) + + predictedReadBytes := uint64(4 * 1024 * 1024) // 4 MB pre-charge + actualReadBytes := uint64(512 * 1024) // 512 KB actual + + req := &TestRequestInfo{ + isWrite: false, + predictedReadBytes: predictedReadBytes, + } + resp := &TestResponseInfo{ + readBytes: actualReadBytes, + succeed: true, + } + + // Pre-charge + _, _, _, _, err := gc.onRequestWaitImpl(context.TODO(), req) + re.NoError(err) + tokensAfterPreCharge := gc.run.requestUnitTokens.limiter.AvailableTokens(time.Now()) + + // Settlement via onResponseImpl (non-waiting path) + _, err = gc.onResponseImpl(req, resp) + re.NoError(err) + tokensAfterSettlement := gc.run.requestUnitTokens.limiter.AvailableTokens(time.Now()) + + // Should have refunded tokens. + re.Greater(tokensAfterSettlement, tokensAfterPreCharge, + "onResponseImpl should refund excess pre-charged tokens") +} + +func TestNonprechargeMetricsRecordedWhenHintZero(t *testing.T) { + re := require.New(t) + gc := createTestGroupCostController(re) + + nonprechargeCounter := metrics.PagingNonprechargeCounter.WithLabelValues(gc.name) + nonprechargeBytes := metrics.PagingNonprechargeActualBytes.WithLabelValues(gc.name) + prechargeCounter := metrics.PagingPrechargeCounter.WithLabelValues(gc.name) + actualBytes := metrics.PagingActualBytesCounter.WithLabelValues(gc.name) + + countBefore := testutil.ToFloat64(nonprechargeCounter) + bytesBefore := testutil.ToFloat64(nonprechargeBytes) + prechargeBefore := testutil.ToFloat64(prechargeCounter) + actualBytesBefore := testutil.ToFloat64(actualBytes) + + // Hint=0 read: implements predictedReadBytesProvider but returns 0 -> + // BeforeKVRequest skips pre-charge, AfterKVRequest must record + // nonprecharge bytes. + const readBytesAmount = uint64(256 * 1024) + coldReq := &TestRequestInfo{isWrite: false} + coldResp := &TestResponseInfo{readBytes: readBytesAmount, succeed: true} + + _, err := gc.onResponseImpl(coldReq, coldResp) + re.NoError(err) + re.InDelta(countBefore+1, testutil.ToFloat64(nonprechargeCounter), 1e-6, + "hint=0 read should increment nonprecharge counter") + re.InDelta(bytesBefore+float64(readBytesAmount), + testutil.ToFloat64(nonprechargeBytes), 1e-6, + "nonprecharge bytes must equal AfterKVRequest actual read bytes") + re.InDelta(prechargeBefore, testutil.ToFloat64(prechargeCounter), 1e-6, + "precharge counter must stay unchanged when hint=0") + re.InDelta(actualBytesBefore, testutil.ToFloat64(actualBytes), 1e-6, + "warm-path actual-bytes counter must stay unchanged when hint=0") + + // Sanity: a hint>0 read should hit the warm path, not the cold path. + warmReq := &TestRequestInfo{isWrite: false, predictedReadBytes: 1 * 1024 * 1024} + warmResp := &TestResponseInfo{readBytes: readBytesAmount, succeed: true} + _, _, _, _, err = gc.onRequestWaitImpl(context.TODO(), warmReq) + re.NoError(err) + _, err = gc.onResponseImpl(warmReq, warmResp) + re.NoError(err) + re.InDelta(countBefore+1, testutil.ToFloat64(nonprechargeCounter), 1e-6, + "warm request must not touch nonprecharge counter") + re.InDelta(prechargeBefore+1, testutil.ToFloat64(prechargeCounter), 1e-6, + "warm request should increment precharge counter") + + // Writes skip both buckets entirely. + writeReq := &TestRequestInfo{isWrite: true, writeBytes: 1024} + writeResp := &TestResponseInfo{readBytes: 0, succeed: true} + _, err = gc.onResponseImpl(writeReq, writeResp) + re.NoError(err) + re.InDelta(countBefore+1, testutil.ToFloat64(nonprechargeCounter), 1e-6, + "write must not touch nonprecharge counter") +} + +func TestPagingPreChargeZeroDelta(t *testing.T) { + re := require.New(t) + gc := createTestGroupCostController(re) + + initialTokens := float64(100000) + gc.run.requestUnitTokens.limiter.Reconfigure(time.Now(), tokenBucketReconfigureArgs{ + newTokens: initialTokens, + newFillRate: 0, + newBurst: 0, + }) + + // Set actual read bytes equal to the predicted bytes so the read-bytes + // delta is zero. The only settlement cost should be CPU time (which + // we set to zero here). + predictedReadBytes := uint64(2 * 1024 * 1024) + req := &TestRequestInfo{ + isWrite: false, + predictedReadBytes: predictedReadBytes, + } + resp := &TestResponseInfo{ + readBytes: predictedReadBytes, // exact match + succeed: true, + } + + _, _, _, _, err := gc.onRequestWaitImpl(context.TODO(), req) + re.NoError(err) + tokensAfterPreCharge := gc.run.requestUnitTokens.limiter.AvailableTokens(time.Now()) + + _, _, err = gc.onResponseWaitImpl(context.TODO(), req, resp) + re.NoError(err) + tokensAfterSettlement := gc.run.requestUnitTokens.limiter.AvailableTokens(time.Now()) + + // With zero CPU cost and exact byte match, settlement delta is zero. + // No tokens should be consumed or refunded in the settlement step. + re.InDelta(tokensAfterPreCharge, tokensAfterSettlement, 1e-6, + "exact byte match should produce zero settlement delta") +} + func TestResourceGroupThrottledError(t *testing.T) { re := require.New(t) gc := createTestGroupCostController(re) diff --git a/client/resource_group/controller/limiter.go b/client/resource_group/controller/limiter.go index 86e1440946e..d592210e251 100644 --- a/client/resource_group/controller/limiter.go +++ b/client/resource_group/controller/limiter.go @@ -325,6 +325,26 @@ func (lim *Limiter) RemoveTokens(now time.Time, amount float64) { lim.maybeNotify() } +// RefundTokens returns previously consumed tokens back to the limiter. +// This is the inverse of RemoveTokens: it increases the available token count. +// It is used when a pre-charged estimate (e.g. paging byte budget) exceeds +// the actual cost observed after the request completes. +// +// No burst cap is applied here — consistent with Reconfigure. The lazy cap +// in getTokens() normalizes the balance on the next limiter operation. +// No low-token notification is needed because refunding moves the balance +// away from the low-token threshold, never toward it. +func (lim *Limiter) RefundTokens(now time.Time, amount float64) { + lim.mu.Lock() + defer lim.mu.Unlock() + if lim.burst < 0 || lim.fillRate == Inf { + return + } + _, tokens := lim.getTokens(now) + lim.last = now + lim.tokens = tokens + amount +} + type tokenBucketReconfigureArgs struct { newTokens float64 newFillRate float64 diff --git a/client/resource_group/controller/limiter_test.go b/client/resource_group/controller/limiter_test.go index f5b2c144757..65978093f7e 100644 --- a/client/resource_group/controller/limiter_test.go +++ b/client/resource_group/controller/limiter_test.go @@ -139,6 +139,42 @@ func TestReconfig(t *testing.T) { re.Equal(int64(-1), lim.GetBurst()) } +func TestRefundTokens(t *testing.T) { + re := require.New(t) + nc := make(chan notifyMsg, 1) + lim := NewLimiter(t0, 0, 0, 100, nc) + + // Consume some tokens. + lim.RemoveTokens(t0, 30) + checkTokens(re, lim, t0, 70) + + // Refund part of them. + lim.RefundTokens(t0, 20) + checkTokens(re, lim, t0, 90) + + // Refund beyond the initial amount — allowed (no burst cap when burst==0). + lim.RefundTokens(t0, 50) + checkTokens(re, lim, t0, 140) + + // With burst > 0, refund can temporarily exceed burst, + // but getTokens (called by AvailableTokens) caps it. + limBurst := NewLimiter(t0, 0, 100, 50, nc) + limBurst.RemoveTokens(t0, 30) + checkTokens(re, limBurst, t0, 20) + limBurst.RefundTokens(t0, 80) // tokens = 20 + 80 = 100, but burst = 100 + checkTokens(re, limBurst, t0, 100) + + // Refund beyond burst — capped by getTokens. + limBurst.RefundTokens(t0, 50) // tokens = 100 + 50 = 150, capped to 100 + checkTokens(re, limBurst, t0, 100) + + // Burstable (burst < 0): RefundTokens is a no-op. + limUnlimited := NewLimiter(t0, 0, -1, 100, nc) + limUnlimited.RefundTokens(t0, 50) + // Should still be 100 (burst<0 short-circuits). + checkTokens(re, limUnlimited, t0, 100) +} + func TestNotify(t *testing.T) { nc := make(chan notifyMsg, 1) lim := NewLimiter(t0, 1, 0, 0, nc) diff --git a/client/resource_group/controller/metrics/metrics.go b/client/resource_group/controller/metrics/metrics.go index 01231aceeaf..5bff7281f9e 100644 --- a/client/resource_group/controller/metrics/metrics.go +++ b/client/resource_group/controller/metrics/metrics.go @@ -53,6 +53,31 @@ var ( FailedTokenRequestDuration prometheus.Observer // SuccessfulTokenRequestDuration comments placeholder, WithLabelValues is a heavy operation, define variable to avoid call it every time. SuccessfulTokenRequestDuration prometheus.Observer + + // PagingPrechargeCounter counts paging pre-charge events (predicted hint + // present and > 0), per resource group. Cold starts and unhinted requests + // are not pre-charged and therefore not counted here. + PagingPrechargeCounter *prometheus.CounterVec + // PagingPrechargeBytesCounter sums bytes used as the pre-charge basis. + PagingPrechargeBytesCounter *prometheus.CounterVec + // PagingActualBytesCounter sums actual read bytes reported after the KV + // RPC for pre-charged requests. Ratio against PagingPrechargeBytesCounter + // reveals over/under-charge factor. + PagingActualBytesCounter *prometheus.CounterVec + // PagingPredictionResidualBytes observes (actual - predicted) bytes for + // pre-charged requests. Shows EMA prediction accuracy. + PagingPredictionResidualBytes *prometheus.HistogramVec + + // PagingNonprechargeCounter counts RPCs that implemented the predicted + // read-bytes interface but reported 0 (e.g. EMA cold-start) and therefore + // ran without pre-charge. Paired with PagingPrechargeCounter this yields + // the cold/ready RPC split from the PD side. + PagingNonprechargeCounter *prometheus.CounterVec + // PagingNonprechargeActualBytes sums the actual read bytes of RPCs that + // skipped pre-charge. Quantifies how much read volume bypassed pre-charge + // throttling — the "cold window" signal for token-bucket pressure + // analysis. + PagingNonprechargeActualBytes *prometheus.CounterVec ) func init() { @@ -156,6 +181,63 @@ func initMetrics(constLabels prometheus.Labels) { // WithLabelValues is a heavy operation, define variable to avoid call it every time. FailedTokenRequestDuration = TokenRequestDuration.WithLabelValues("fail") SuccessfulTokenRequestDuration = TokenRequestDuration.WithLabelValues("success") + + PagingPrechargeCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: requestSubsystem, + Name: "paging_precharge_total", + Help: "Counter of RC paging pre-charge events (PredictedReadBytes hint present and > 0).", + ConstLabels: constLabels, + }, []string{newResourceGroupNameLabel}) + + PagingPrechargeBytesCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: requestSubsystem, + Name: "paging_precharge_bytes_total", + Help: "Sum of bytes used as the RC paging pre-charge basis (predicted hint).", + ConstLabels: constLabels, + }, []string{newResourceGroupNameLabel}) + + PagingActualBytesCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: requestSubsystem, + Name: "paging_actual_bytes_total", + Help: "Sum of actual bytes read by paging KV requests that were pre-charged.", + ConstLabels: constLabels, + }, []string{newResourceGroupNameLabel}) + + PagingPredictionResidualBytes = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: requestSubsystem, + Name: "paging_prediction_residual_bytes", + // Signed residual = actual - predicted. Buckets cover both directions + // up to the typical paging budget (a few MB). + Buckets: []float64{-4194304, -1048576, -262144, -65536, -16384, -4096, 0, 4096, 16384, 65536, 262144, 1048576, 4194304}, + Help: "Histogram of (actual_read_bytes - predicted_read_bytes) for pre-charged requests. Shows EMA prediction accuracy.", + ConstLabels: constLabels, + }, []string{newResourceGroupNameLabel}) + + PagingNonprechargeCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: requestSubsystem, + Name: "paging_nonprecharge_total", + Help: "Counter of RC paging RPCs that implemented the predicted hint but reported 0 (e.g. EMA cold-start) and ran without pre-charge.", + ConstLabels: constLabels, + }, []string{newResourceGroupNameLabel}) + + PagingNonprechargeActualBytes = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: requestSubsystem, + Name: "paging_nonprecharge_actual_bytes_total", + Help: "Sum of actual bytes read by paging RPCs that skipped pre-charge (hint=0). Quantifies cold-window read volume bypassing pre-charge throttling.", + ConstLabels: constLabels, + }, []string{newResourceGroupNameLabel}) } // InitAndRegisterMetrics initializes and register metrics. @@ -171,4 +253,10 @@ func InitAndRegisterMetrics(constLabels prometheus.Labels) { prometheus.MustRegister(ResourceGroupTokenRequestCounter) prometheus.MustRegister(LowTokenRequestNotifyCounter) prometheus.MustRegister(TokenConsumedHistogram) + prometheus.MustRegister(PagingPrechargeCounter) + prometheus.MustRegister(PagingPrechargeBytesCounter) + prometheus.MustRegister(PagingActualBytesCounter) + prometheus.MustRegister(PagingPredictionResidualBytes) + prometheus.MustRegister(PagingNonprechargeCounter) + prometheus.MustRegister(PagingNonprechargeActualBytes) } diff --git a/client/resource_group/controller/model.go b/client/resource_group/controller/model.go index 54721969811..f540f1a6b87 100644 --- a/client/resource_group/controller/model.go +++ b/client/resource_group/controller/model.go @@ -52,6 +52,34 @@ type RequestInfo interface { StoreID() uint64 RequestSize() uint64 AccessLocationType() AccessLocationType + // PagingSizeBytes returns the byte budget per page for RC paging. + // 0 means no byte-based pre-charge estimation is applied. + PagingSizeBytes() uint64 +} + +// predictedReadBytesProvider is an optional interface that a RequestInfo +// implementation may satisfy to supply a learned estimate (e.g., from a +// per-logical-scan EMA maintained in TiDB) of how many bytes the request +// will read. When present and > 0, it is used as the byte basis for RC +// paging pre-charge; when absent or zero the request is not pre-charged +// and will be billed in AfterKVRequest by actual read bytes only. +// +// Defined as an optional interface (not a method on RequestInfo) so older +// RequestInfo implementations that have not been updated still compile; +// they simply skip pre-charge. +type predictedReadBytesProvider interface { + PredictedReadBytes() uint64 +} + +// estimatedReadBytes returns the byte basis used for RC paging pre-charge, +// or 0 to skip pre-charge entirely. Only a learned PredictedReadBytes hint +// is honored: PagingSizeBytes is intentionally ignored here so that the +// protocol-level paging cap and the RU-billing pre-charge stay decoupled. +func estimatedReadBytes(req RequestInfo) uint64 { + if p, ok := req.(predictedReadBytesProvider); ok { + return p.PredictedReadBytes() + } + return 0 } // ResponseInfo is the interface of the response information provider. A response should be @@ -101,9 +129,15 @@ func (kc *KVCalculator) BeforeKVRequest(consumption *rmpb.Consumption, req Reque kc.calculateWriteCost(consumption, req) } else { consumption.KvReadRpcCount += 1 - // Read bytes could not be known before the request is executed, - // so we only add the base cost here. consumption.RRU += float64(kc.ReadBaseCost) + float64(kc.ReadPerBatchBaseCost)*defaultAvgBatchProportion + // RC Paging pre-charge: pre-charge the learned read-bytes RU so that + // concurrent workers are throttled at BeforeKVRequest instead of all + // hitting AfterKVRequest settlement at once. Only applies when the + // caller supplies a PredictedReadBytes hint; without it the request + // is billed in AfterKVRequest by actual read bytes only. + if bytesForEst := estimatedReadBytes(req); bytesForEst > 0 { + consumption.RRU += float64(kc.ReadBytesCost) * float64(bytesForEst) + } } if req.AccessLocationType() == AccessCrossZone { if req.IsWrite() { @@ -138,6 +172,13 @@ func (kc *KVCalculator) AfterKVRequest(consumption *rmpb.Consumption, req Reques if !req.IsWrite() { // For now, we can only collect the KV CPU cost for a read request. kc.calculateCPUCost(consumption, res) + // RC Paging settlement: subtract the pre-charged bytes RU added in + // BeforeKVRequest so the net total (pre-charge + settle) equals + // baseCost + actualCost. Symmetric with BeforeKVRequest: when no + // hint was supplied nothing was pre-charged, so nothing is refunded. + if bytesForEst := estimatedReadBytes(req); bytesForEst > 0 { + consumption.RRU -= float64(kc.ReadBytesCost) * float64(bytesForEst) + } } else if !res.Succeed() { // If the write request is not successfully returned, we need to pay back the WRU cost. kc.payBackWriteCost(consumption, req) diff --git a/client/resource_group/controller/testutil.go b/client/resource_group/controller/testutil.go index a055b32fa6f..f2a6d9ab969 100644 --- a/client/resource_group/controller/testutil.go +++ b/client/resource_group/controller/testutil.go @@ -22,11 +22,13 @@ import "time" // TestRequestInfo is used to test the request info interface. type TestRequestInfo struct { - isWrite bool - writeBytes uint64 - numReplicas int64 - storeID uint64 - accessType AccessLocationType + isWrite bool + writeBytes uint64 + numReplicas int64 + storeID uint64 + accessType AccessLocationType + pagingSizeBytes uint64 + predictedReadBytes uint64 } // NewTestRequestInfo creates a new TestRequestInfo. @@ -70,6 +72,17 @@ func (tri *TestRequestInfo) AccessLocationType() AccessLocationType { return tri.accessType } +// PagingSizeBytes implements the RequestInfo interface. +func (tri *TestRequestInfo) PagingSizeBytes() uint64 { + return tri.pagingSizeBytes +} + +// PredictedReadBytes implements the optional predictedReadBytesProvider +// interface so tests can exercise the EMA-based hint path. +func (tri *TestRequestInfo) PredictedReadBytes() uint64 { + return tri.predictedReadBytes +} + // TestResponseInfo is used to test the response info interface. type TestResponseInfo struct { readBytes uint64