From bc1a33a7cef2699a8161079cf902f8da23dc9103 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Wed, 1 Apr 2026 12:59:48 +0800 Subject: [PATCH 01/10] resource_control: add paging bytes pre-charge in RU calculation Add PagingSizeBytes() to RequestInfo interface. When a read request carries a byte budget (from RC paging), BeforeKVRequest pre-charges the estimated read RU in Phase 1, and AfterKVRequest subtracts it in Phase 2 to maintain correct total cost. This makes concurrent workers throttle at Phase 1 instead of all hitting Phase 2 simultaneously. Signed-off-by: JmPotato --- client/resource_group/controller/model.go | 17 +++++++++++++++-- client/resource_group/controller/testutil.go | 5 +++++ 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/client/resource_group/controller/model.go b/client/resource_group/controller/model.go index 54721969811..3e346037367 100644 --- a/client/resource_group/controller/model.go +++ b/client/resource_group/controller/model.go @@ -52,6 +52,9 @@ type RequestInfo interface { StoreID() uint64 RequestSize() uint64 AccessLocationType() AccessLocationType + // PagingSizeBytes returns the byte budget per page for RC paging. + // 0 means no byte-based pre-charge estimation is applied. + PagingSizeBytes() uint64 } // ResponseInfo is the interface of the response information provider. A response should be @@ -101,9 +104,13 @@ func (kc *KVCalculator) BeforeKVRequest(consumption *rmpb.Consumption, req Reque kc.calculateWriteCost(consumption, req) } else { consumption.KvReadRpcCount += 1 - // Read bytes could not be known before the request is executed, - // so we only add the base cost here. consumption.RRU += float64(kc.ReadBaseCost) + float64(kc.ReadPerBatchBaseCost)*defaultAvgBatchProportion + // RC Paging pre-charge: if the request has a byte budget, pre-charge + // the estimated read bytes RU so that concurrent workers are throttled + // at Phase 1 instead of all hitting Phase 2 at once. + if pagingSizeBytes := req.PagingSizeBytes(); pagingSizeBytes > 0 { + consumption.RRU += float64(kc.ReadBytesCost) * float64(pagingSizeBytes) + } } if req.AccessLocationType() == AccessCrossZone { if req.IsWrite() { @@ -138,6 +145,12 @@ func (kc *KVCalculator) AfterKVRequest(consumption *rmpb.Consumption, req Reques if !req.IsWrite() { // For now, we can only collect the KV CPU cost for a read request. kc.calculateCPUCost(consumption, res) + // RC Paging settlement: subtract the pre-charged bytes RU added in + // BeforeKVRequest so the net total (Phase 1 + Phase 2) equals + // baseCost + actualCost. + if pagingSizeBytes := req.PagingSizeBytes(); pagingSizeBytes > 0 { + consumption.RRU -= float64(kc.ReadBytesCost) * float64(pagingSizeBytes) + } } else if !res.Succeed() { // If the write request is not successfully returned, we need to pay back the WRU cost. kc.payBackWriteCost(consumption, req) diff --git a/client/resource_group/controller/testutil.go b/client/resource_group/controller/testutil.go index a055b32fa6f..38a095b320e 100644 --- a/client/resource_group/controller/testutil.go +++ b/client/resource_group/controller/testutil.go @@ -70,6 +70,11 @@ func (tri *TestRequestInfo) AccessLocationType() AccessLocationType { return tri.accessType } +// PagingSizeBytes implements the RequestInfo interface. +func (*TestRequestInfo) PagingSizeBytes() uint64 { + return 0 +} + // TestResponseInfo is used to test the response info interface. type TestResponseInfo struct { readBytes uint64 From b98ff48814c2041d161f31840167c95ca5b393c1 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Wed, 1 Apr 2026 13:31:04 +0800 Subject: [PATCH 02/10] resource_control: add unit test for paging bytes pre-charge Test that BeforeKVRequest pre-charges pagingSizeBytes * ReadBytesCost, AfterKVRequest subtracts it, and the net total equals baseCost + actualCost. Signed-off-by: JmPotato --- .../controller/group_controller_test.go | 49 +++++++++++++++++++ client/resource_group/controller/testutil.go | 15 +++--- 2 files changed, 57 insertions(+), 7 deletions(-) diff --git a/client/resource_group/controller/group_controller_test.go b/client/resource_group/controller/group_controller_test.go index f916f5e56c4..547a4f894bd 100644 --- a/client/resource_group/controller/group_controller_test.go +++ b/client/resource_group/controller/group_controller_test.go @@ -208,6 +208,55 @@ func TestOnResponseWaitConsumption(t *testing.T) { verify() } +func TestPagingSizeBytesPreCharge(t *testing.T) { + re := require.New(t) + cfg := DefaultRUConfig() + kvCalc := newKVCalculator(cfg) + + // Phase 1: BeforeKVRequest with pagingSizeBytes should pre-charge + // baseCost + pagingSizeBytes * ReadBytesCost. + pagingSizeBytes := uint64(4 * 1024 * 1024) // 4 MB + req := &TestRequestInfo{ + isWrite: false, + pagingSizeBytes: pagingSizeBytes, + } + phase1 := &rmpb.Consumption{} + kvCalc.BeforeKVRequest(phase1, req) + + baseCost := float64(cfg.ReadBaseCost) + float64(cfg.ReadPerBatchBaseCost)*0.7 + bytesCost := float64(cfg.ReadBytesCost) * float64(pagingSizeBytes) + re.InDelta(baseCost+bytesCost, phase1.RRU, 1e-6, + "Phase 1 should pre-charge baseCost + bytes RU") + + // Phase 2: AfterKVRequest should subtract the pre-charged bytes RU. + resp := &TestResponseInfo{ + readBytes: 2 * 1024 * 1024, // actual read 2 MB + kvCPU: 10 * time.Millisecond, + succeed: true, + } + phase2 := &rmpb.Consumption{} + kvCalc.AfterKVRequest(phase2, req, resp) + + actualReadCost := float64(cfg.ReadBytesCost) * float64(resp.readBytes) + cpuCost := float64(cfg.CPUMsCost) * 10.0 + expectedPhase2RRU := actualReadCost + cpuCost - bytesCost + re.InDelta(expectedPhase2RRU, phase2.RRU, 1e-6, + "Phase 2 should be actualCost - preCharged bytesCost") + + // Net total should equal baseCost + actualCost (no double-counting). + totalRRU := phase1.RRU + phase2.RRU + expectedTotal := baseCost + actualReadCost + cpuCost + re.InDelta(expectedTotal, totalRRU, 1e-6, + "Total RRU across Phase 1+2 should equal baseCost + actualCost") + + // Without pagingSizeBytes, Phase 1 should only charge baseCost. + reqNoPaging := &TestRequestInfo{isWrite: false} + noPaging := &rmpb.Consumption{} + kvCalc.BeforeKVRequest(noPaging, reqNoPaging) + re.InDelta(baseCost, noPaging.RRU, 1e-6, + "Without paging, Phase 1 should only charge baseCost") +} + func TestResourceGroupThrottledError(t *testing.T) { re := require.New(t) gc := createTestGroupCostController(re) diff --git a/client/resource_group/controller/testutil.go b/client/resource_group/controller/testutil.go index 38a095b320e..ea67f035233 100644 --- a/client/resource_group/controller/testutil.go +++ b/client/resource_group/controller/testutil.go @@ -22,11 +22,12 @@ import "time" // TestRequestInfo is used to test the request info interface. type TestRequestInfo struct { - isWrite bool - writeBytes uint64 - numReplicas int64 - storeID uint64 - accessType AccessLocationType + isWrite bool + writeBytes uint64 + numReplicas int64 + storeID uint64 + accessType AccessLocationType + pagingSizeBytes uint64 } // NewTestRequestInfo creates a new TestRequestInfo. @@ -71,8 +72,8 @@ func (tri *TestRequestInfo) AccessLocationType() AccessLocationType { } // PagingSizeBytes implements the RequestInfo interface. -func (*TestRequestInfo) PagingSizeBytes() uint64 { - return 0 +func (tri *TestRequestInfo) PagingSizeBytes() uint64 { + return tri.pagingSizeBytes } // TestResponseInfo is used to test the response info interface. From fda21466b565da4c7fd73e10ab05145295ac865e Mon Sep 17 00:00:00 2001 From: JmPotato Date: Thu, 9 Apr 2026 13:35:22 +0800 Subject: [PATCH 03/10] resource_control: refund excess pre-charged tokens on response settlement MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When paging byte budget (pagingSizeBytes) is enabled, BeforeKVRequest pre-charges ReadBytesCost * pagingSizeBytes RRU into the token limiter. AfterKVRequest then computes the actual cost and subtracts the pre-charge. If the actual cost is less than the pre-charge (common case, since pagingSizeBytes is an upper bound), the settlement delta is negative. Previously, the negative delta was correctly recorded in the consumption counter but silently dropped by the token limiter due to a `v > 0` guard — causing permanent token leakage proportional to (pagingSizeBytes - actualReadBytes) per request. Fix: add Limiter.RefundTokens (inverse of RemoveTokens) and call it from both onResponseImpl and onResponseWaitImpl when the settlement delta is negative. This ensures the limiter's available token balance accurately reflects actual consumption after each request completes. RefundTokens design notes: - No burst cap applied (consistent with Reconfigure; getTokens handles lazy capping on the next limiter operation). - No maybeNotify call (refunding moves balance away from the low-token threshold, never toward it). Signed-off-by: JmPotato --- .../controller/group_controller.go | 29 +-- .../controller/group_controller_test.go | 166 ++++++++++++++++++ client/resource_group/controller/limiter.go | 20 +++ .../resource_group/controller/limiter_test.go | 36 ++++ 4 files changed, 240 insertions(+), 11 deletions(-) diff --git a/client/resource_group/controller/group_controller.go b/client/resource_group/controller/group_controller.go index 4fca2091c00..8f3b2917465 100644 --- a/client/resource_group/controller/group_controller.go +++ b/client/resource_group/controller/group_controller.go @@ -605,6 +605,8 @@ func (gc *groupCostController) onResponseImpl( counter := gc.run.requestUnitTokens if v := getRUValueFromConsumption(delta); v > 0 { counter.limiter.RemoveTokens(time.Now(), v) + } else if v < 0 { + counter.limiter.RefundTokens(time.Now(), -v) } } @@ -634,19 +636,24 @@ func (gc *groupCostController) onResponseWaitImpl( } var waitDuration time.Duration if !gc.burstable.Load() { - allowDebt := delta.ReadBytes+delta.WriteBytes < bigRequestThreshold || !gc.isThrottled.Load() - d, err := gc.acquireTokens(ctx, delta, &waitDuration, allowDebt) - if err != nil { - if errs.ErrClientResourceGroupThrottled.Equal(err) { - gc.metrics.failedRequestCounterWithThrottled.Inc() - gc.metrics.failedLimitReserveDuration.Observe(d.Seconds()) - } else { - gc.metrics.failedRequestCounterWithOthers.Inc() + v := getRUValueFromConsumption(delta) + if v > 0 { + allowDebt := delta.ReadBytes+delta.WriteBytes < bigRequestThreshold || !gc.isThrottled.Load() + d, err := gc.acquireTokens(ctx, delta, &waitDuration, allowDebt) + if err != nil { + if errs.ErrClientResourceGroupThrottled.Equal(err) { + gc.metrics.failedRequestCounterWithThrottled.Inc() + gc.metrics.failedLimitReserveDuration.Observe(d.Seconds()) + } else { + gc.metrics.failedRequestCounterWithOthers.Inc() + } + return nil, waitDuration, err } - return nil, waitDuration, err + gc.metrics.successfulRequestDuration.Observe(d.Seconds()) + waitDuration += d + } else if v < 0 { + gc.run.requestUnitTokens.limiter.RefundTokens(time.Now(), -v) } - gc.metrics.successfulRequestDuration.Observe(d.Seconds()) - waitDuration += d } gc.mu.Lock() diff --git a/client/resource_group/controller/group_controller_test.go b/client/resource_group/controller/group_controller_test.go index 547a4f894bd..e88e733e19a 100644 --- a/client/resource_group/controller/group_controller_test.go +++ b/client/resource_group/controller/group_controller_test.go @@ -257,6 +257,172 @@ func TestPagingSizeBytesPreCharge(t *testing.T) { "Without paging, Phase 1 should only charge baseCost") } +func TestPagingPreChargeTokenRefund(t *testing.T) { + re := require.New(t) + gc := createTestGroupCostController(re) + + // Give the limiter a known amount of tokens with no fill rate for precise measurement. + initialTokens := float64(100000) + gc.run.requestUnitTokens.limiter.Reconfigure(time.Now(), tokenBucketReconfigureArgs{ + newTokens: initialTokens, + newFillRate: 0, + newBurst: 0, + }) + + pagingSizeBytes := uint64(4 * 1024 * 1024) // 4 MB pre-charge + actualReadBytes := uint64(1 * 1024 * 1024) // 1 MB actual + + req := &TestRequestInfo{ + isWrite: false, + pagingSizeBytes: pagingSizeBytes, + } + resp := &TestResponseInfo{ + readBytes: actualReadBytes, + succeed: true, + } + + // Pre-charge reserves tokens from the limiter. + _, _, _, _, err := gc.onRequestWaitImpl(context.TODO(), req) + re.NoError(err) + tokensAfterPreCharge := gc.run.requestUnitTokens.limiter.AvailableTokens(time.Now()) + + // Response settlement should refund excess tokens. + _, _, err = gc.onResponseWaitImpl(context.TODO(), req, resp) + re.NoError(err) + tokensAfterSettlement := gc.run.requestUnitTokens.limiter.AvailableTokens(time.Now()) + + // The limiter should have more tokens after settlement than after pre-charge, + // because the refund (pre-charge - actual) exceeds the actual read cost. + cfg := DefaultRUConfig() + preChargeCost := float64(cfg.ReadBytesCost) * float64(pagingSizeBytes) + actualCost := float64(cfg.ReadBytesCost) * float64(actualReadBytes) + expectedRefund := preChargeCost - actualCost + re.Positive(expectedRefund, "sanity: pre-charge should exceed actual cost") + + re.InDelta(tokensAfterPreCharge+expectedRefund, tokensAfterSettlement, 1.0, + "limiter should be refunded the excess pre-charged tokens") + + // Verify net consumption is correct: baseCost + actualReadCost. + gc.mu.Lock() + netRRU := gc.mu.consumption.RRU + gc.mu.Unlock() + baseCost := float64(cfg.ReadBaseCost) + float64(cfg.ReadPerBatchBaseCost)*defaultAvgBatchProportion + expectedNetRRU := baseCost + actualCost + re.InDelta(expectedNetRRU, netRRU, 1e-6, + "net consumption should equal baseCost + actualReadCost") +} + +func TestPagingPreChargeNoRefundWhenActualExceedsEstimate(t *testing.T) { + re := require.New(t) + gc := createTestGroupCostController(re) + + initialTokens := float64(100000) + gc.run.requestUnitTokens.limiter.Reconfigure(time.Now(), tokenBucketReconfigureArgs{ + newTokens: initialTokens, + newFillRate: 0, + newBurst: 0, + }) + + pagingSizeBytes := uint64(1 * 1024 * 1024) // 1 MB pre-charge + actualReadBytes := uint64(4 * 1024 * 1024) // 4 MB actual (exceeds estimate) + + req := &TestRequestInfo{ + isWrite: false, + pagingSizeBytes: pagingSizeBytes, + } + resp := &TestResponseInfo{ + readBytes: actualReadBytes, + succeed: true, + } + + _, _, _, _, err := gc.onRequestWaitImpl(context.TODO(), req) + re.NoError(err) + tokensAfterPreCharge := gc.run.requestUnitTokens.limiter.AvailableTokens(time.Now()) + + _, _, err = gc.onResponseWaitImpl(context.TODO(), req, resp) + re.NoError(err) + tokensAfterSettlement := gc.run.requestUnitTokens.limiter.AvailableTokens(time.Now()) + + // Actual exceeds pre-charge, so settlement should consume more tokens (not refund). + re.Less(tokensAfterSettlement, tokensAfterPreCharge, + "when actual exceeds pre-charge, settlement should consume tokens") +} + +func TestOnResponseImplPagingRefund(t *testing.T) { + re := require.New(t) + gc := createTestGroupCostController(re) + + initialTokens := float64(100000) + gc.run.requestUnitTokens.limiter.Reconfigure(time.Now(), tokenBucketReconfigureArgs{ + newTokens: initialTokens, + newFillRate: 0, + newBurst: 0, + }) + + pagingSizeBytes := uint64(4 * 1024 * 1024) // 4 MB pre-charge + actualReadBytes := uint64(512 * 1024) // 512 KB actual + + req := &TestRequestInfo{ + isWrite: false, + pagingSizeBytes: pagingSizeBytes, + } + resp := &TestResponseInfo{ + readBytes: actualReadBytes, + succeed: true, + } + + // Pre-charge + _, _, _, _, err := gc.onRequestWaitImpl(context.TODO(), req) + re.NoError(err) + tokensAfterPreCharge := gc.run.requestUnitTokens.limiter.AvailableTokens(time.Now()) + + // Settlement via onResponseImpl (non-waiting path) + _, err = gc.onResponseImpl(req, resp) + re.NoError(err) + tokensAfterSettlement := gc.run.requestUnitTokens.limiter.AvailableTokens(time.Now()) + + // Should have refunded tokens. + re.Greater(tokensAfterSettlement, tokensAfterPreCharge, + "onResponseImpl should refund excess pre-charged tokens") +} + +func TestPagingPreChargeZeroDelta(t *testing.T) { + re := require.New(t) + gc := createTestGroupCostController(re) + + initialTokens := float64(100000) + gc.run.requestUnitTokens.limiter.Reconfigure(time.Now(), tokenBucketReconfigureArgs{ + newTokens: initialTokens, + newFillRate: 0, + newBurst: 0, + }) + + // Set actual read bytes equal to pagingSizeBytes so the read-bytes delta is zero. + // The only settlement cost should be CPU time (which we set to zero here). + pagingSizeBytes := uint64(2 * 1024 * 1024) + req := &TestRequestInfo{ + isWrite: false, + pagingSizeBytes: pagingSizeBytes, + } + resp := &TestResponseInfo{ + readBytes: pagingSizeBytes, // exact match + succeed: true, + } + + _, _, _, _, err := gc.onRequestWaitImpl(context.TODO(), req) + re.NoError(err) + tokensAfterPreCharge := gc.run.requestUnitTokens.limiter.AvailableTokens(time.Now()) + + _, _, err = gc.onResponseWaitImpl(context.TODO(), req, resp) + re.NoError(err) + tokensAfterSettlement := gc.run.requestUnitTokens.limiter.AvailableTokens(time.Now()) + + // With zero CPU cost and exact byte match, settlement delta is zero. + // No tokens should be consumed or refunded in the settlement step. + re.InDelta(tokensAfterPreCharge, tokensAfterSettlement, 1e-6, + "exact byte match should produce zero settlement delta") +} + func TestResourceGroupThrottledError(t *testing.T) { re := require.New(t) gc := createTestGroupCostController(re) diff --git a/client/resource_group/controller/limiter.go b/client/resource_group/controller/limiter.go index 86e1440946e..d592210e251 100644 --- a/client/resource_group/controller/limiter.go +++ b/client/resource_group/controller/limiter.go @@ -325,6 +325,26 @@ func (lim *Limiter) RemoveTokens(now time.Time, amount float64) { lim.maybeNotify() } +// RefundTokens returns previously consumed tokens back to the limiter. +// This is the inverse of RemoveTokens: it increases the available token count. +// It is used when a pre-charged estimate (e.g. paging byte budget) exceeds +// the actual cost observed after the request completes. +// +// No burst cap is applied here — consistent with Reconfigure. The lazy cap +// in getTokens() normalizes the balance on the next limiter operation. +// No low-token notification is needed because refunding moves the balance +// away from the low-token threshold, never toward it. +func (lim *Limiter) RefundTokens(now time.Time, amount float64) { + lim.mu.Lock() + defer lim.mu.Unlock() + if lim.burst < 0 || lim.fillRate == Inf { + return + } + _, tokens := lim.getTokens(now) + lim.last = now + lim.tokens = tokens + amount +} + type tokenBucketReconfigureArgs struct { newTokens float64 newFillRate float64 diff --git a/client/resource_group/controller/limiter_test.go b/client/resource_group/controller/limiter_test.go index f5b2c144757..65978093f7e 100644 --- a/client/resource_group/controller/limiter_test.go +++ b/client/resource_group/controller/limiter_test.go @@ -139,6 +139,42 @@ func TestReconfig(t *testing.T) { re.Equal(int64(-1), lim.GetBurst()) } +func TestRefundTokens(t *testing.T) { + re := require.New(t) + nc := make(chan notifyMsg, 1) + lim := NewLimiter(t0, 0, 0, 100, nc) + + // Consume some tokens. + lim.RemoveTokens(t0, 30) + checkTokens(re, lim, t0, 70) + + // Refund part of them. + lim.RefundTokens(t0, 20) + checkTokens(re, lim, t0, 90) + + // Refund beyond the initial amount — allowed (no burst cap when burst==0). + lim.RefundTokens(t0, 50) + checkTokens(re, lim, t0, 140) + + // With burst > 0, refund can temporarily exceed burst, + // but getTokens (called by AvailableTokens) caps it. + limBurst := NewLimiter(t0, 0, 100, 50, nc) + limBurst.RemoveTokens(t0, 30) + checkTokens(re, limBurst, t0, 20) + limBurst.RefundTokens(t0, 80) // tokens = 20 + 80 = 100, but burst = 100 + checkTokens(re, limBurst, t0, 100) + + // Refund beyond burst — capped by getTokens. + limBurst.RefundTokens(t0, 50) // tokens = 100 + 50 = 150, capped to 100 + checkTokens(re, limBurst, t0, 100) + + // Burstable (burst < 0): RefundTokens is a no-op. + limUnlimited := NewLimiter(t0, 0, -1, 100, nc) + limUnlimited.RefundTokens(t0, 50) + // Should still be 100 (burst<0 short-circuits). + checkTokens(re, limUnlimited, t0, 100) +} + func TestNotify(t *testing.T) { nc := make(chan notifyMsg, 1) lim := NewLimiter(t0, 1, 0, 0, nc) From f01d119b826f72f5910cff9ef1e666391439134e Mon Sep 17 00:00:00 2001 From: Yuhao Zhang Date: Tue, 14 Apr 2026 15:15:33 +0800 Subject: [PATCH 04/10] resource_control: add PredictedReadBytes hint for RC paging pre-charge Introduce an optional predictedReadBytesProvider interface on RequestInfo. When a caller (e.g. TiDB maintaining a per-logical-scan EMA across paging RPCs) supplies a non-zero PredictedReadBytes, BeforeKVRequest/AfterKVRequest use that value as the byte basis for the paging pre-charge instead of PagingSizeBytes. PagingSizeBytes remains the fallback and worst-case cap. This lets TiDB replace the current fixed 4 MiB pre-charge (which matches the paging byte budget but typically overshoots actual scanned bytes and stalls concurrent workers at Phase 1) with a learned estimate, without changing kvproto or TiKV behavior. The hint is added as an optional interface (not a method on RequestInfo) so existing RequestInfo implementations compile unchanged; they continue to fall back to PagingSizeBytes. Ref: per-logical-scan EMA pre-deduction design Signed-off-by: Yuhao Zhang --- .../controller/group_controller_test.go | 64 +++++++++++++++++++ client/resource_group/controller/model.go | 41 ++++++++++-- client/resource_group/controller/testutil.go | 19 ++++-- 3 files changed, 112 insertions(+), 12 deletions(-) diff --git a/client/resource_group/controller/group_controller_test.go b/client/resource_group/controller/group_controller_test.go index e88e733e19a..fcf8cad5177 100644 --- a/client/resource_group/controller/group_controller_test.go +++ b/client/resource_group/controller/group_controller_test.go @@ -257,6 +257,70 @@ func TestPagingSizeBytesPreCharge(t *testing.T) { "Without paging, Phase 1 should only charge baseCost") } +func TestPredictedReadBytesOverridesPagingSizeBytes(t *testing.T) { + re := require.New(t) + cfg := DefaultRUConfig() + kvCalc := newKVCalculator(cfg) + + // When PredictedReadBytes (the EMA hint) is > 0, it should override + // PagingSizeBytes as the pre-charge basis. PagingSizeBytes is kept as a + // safety cap and worst-case fallback; the hint gives a tighter estimate. + pagingSizeBytes := uint64(4 * 1024 * 1024) // 4 MB worst-case cap + predictedReadBytes := uint64(256 * 1024) // 256 KB learned estimate + req := &TestRequestInfo{ + isWrite: false, + pagingSizeBytes: pagingSizeBytes, + predictedReadBytes: predictedReadBytes, + } + + phase1 := &rmpb.Consumption{} + kvCalc.BeforeKVRequest(phase1, req) + + baseCost := float64(cfg.ReadBaseCost) + float64(cfg.ReadPerBatchBaseCost)*defaultAvgBatchProportion + hintCost := float64(cfg.ReadBytesCost) * float64(predictedReadBytes) + re.InDelta(baseCost+hintCost, phase1.RRU, 1e-6, + "Phase 1 should pre-charge based on PredictedReadBytes, not PagingSizeBytes") + + // Phase 2 should subtract the same hint-based basis, preserving the + // invariant that Phase 1 + Phase 2 == baseCost + actualCost. + actualReadBytes := uint64(300 * 1024) // close to the prediction + resp := &TestResponseInfo{ + readBytes: actualReadBytes, + kvCPU: 0, + succeed: true, + } + phase2 := &rmpb.Consumption{} + kvCalc.AfterKVRequest(phase2, req, resp) + + actualReadCost := float64(cfg.ReadBytesCost) * float64(actualReadBytes) + expectedPhase2RRU := actualReadCost - hintCost + re.InDelta(expectedPhase2RRU, phase2.RRU, 1e-6, + "Phase 2 should settle using the same hint basis as Phase 1") + + totalRRU := phase1.RRU + phase2.RRU + re.InDelta(baseCost+actualReadCost, totalRRU, 1e-6, + "Total RRU across Phase 1+2 should still equal baseCost + actualCost") + + // When the hint is zero, fall back to PagingSizeBytes (old behavior). + reqFallback := &TestRequestInfo{ + isWrite: false, + pagingSizeBytes: pagingSizeBytes, + // predictedReadBytes left at 0 + } + phase1Fallback := &rmpb.Consumption{} + kvCalc.BeforeKVRequest(phase1Fallback, reqFallback) + fallbackBytesCost := float64(cfg.ReadBytesCost) * float64(pagingSizeBytes) + re.InDelta(baseCost+fallbackBytesCost, phase1Fallback.RRU, 1e-6, + "With no hint, pre-charge should fall back to PagingSizeBytes") + + // When both are zero, no byte-based pre-charge is applied. + reqNone := &TestRequestInfo{isWrite: false} + phase1None := &rmpb.Consumption{} + kvCalc.BeforeKVRequest(phase1None, reqNone) + re.InDelta(baseCost, phase1None.RRU, 1e-6, + "Without hint or paging, Phase 1 should only charge baseCost") +} + func TestPagingPreChargeTokenRefund(t *testing.T) { re := require.New(t) gc := createTestGroupCostController(re) diff --git a/client/resource_group/controller/model.go b/client/resource_group/controller/model.go index 3e346037367..5c49f91aedc 100644 --- a/client/resource_group/controller/model.go +++ b/client/resource_group/controller/model.go @@ -57,6 +57,32 @@ type RequestInfo interface { PagingSizeBytes() uint64 } +// predictedReadBytesProvider is an optional interface that a RequestInfo +// implementation may satisfy to supply a learned estimate (e.g., from a +// per-logical-scan EMA maintained in TiDB) of how many bytes the request +// will read. When present and > 0, it overrides PagingSizeBytes as the +// byte basis for RC paging pre-charge. +// +// Defined as an optional interface (not a method on RequestInfo) so older +// RequestInfo implementations that have not been updated still compile and +// behave as before (falling back to PagingSizeBytes). +type predictedReadBytesProvider interface { + PredictedReadBytes() uint64 +} + +// estimatedReadBytes returns the byte basis used for RC paging pre-charge. +// It prefers a learned PredictedReadBytes hint when the RequestInfo +// implements the optional provider and returns a non-zero value; otherwise +// it falls back to PagingSizeBytes (the paging byte budget / worst-case cap). +func estimatedReadBytes(req RequestInfo) uint64 { + if p, ok := req.(predictedReadBytesProvider); ok { + if hint := p.PredictedReadBytes(); hint > 0 { + return hint + } + } + return req.PagingSizeBytes() +} + // ResponseInfo is the interface of the response information provider. A response should be // able to tell how many bytes it read and KV CPU cost in milliseconds. type ResponseInfo interface { @@ -107,9 +133,11 @@ func (kc *KVCalculator) BeforeKVRequest(consumption *rmpb.Consumption, req Reque consumption.RRU += float64(kc.ReadBaseCost) + float64(kc.ReadPerBatchBaseCost)*defaultAvgBatchProportion // RC Paging pre-charge: if the request has a byte budget, pre-charge // the estimated read bytes RU so that concurrent workers are throttled - // at Phase 1 instead of all hitting Phase 2 at once. - if pagingSizeBytes := req.PagingSizeBytes(); pagingSizeBytes > 0 { - consumption.RRU += float64(kc.ReadBytesCost) * float64(pagingSizeBytes) + // at Phase 1 instead of all hitting Phase 2 at once. Prefer a learned + // PredictedReadBytes hint when the caller supplies one; otherwise fall + // back to PagingSizeBytes (the paging byte budget / worst-case cap). + if bytesForEst := estimatedReadBytes(req); bytesForEst > 0 { + consumption.RRU += float64(kc.ReadBytesCost) * float64(bytesForEst) } } if req.AccessLocationType() == AccessCrossZone { @@ -147,9 +175,10 @@ func (kc *KVCalculator) AfterKVRequest(consumption *rmpb.Consumption, req Reques kc.calculateCPUCost(consumption, res) // RC Paging settlement: subtract the pre-charged bytes RU added in // BeforeKVRequest so the net total (Phase 1 + Phase 2) equals - // baseCost + actualCost. - if pagingSizeBytes := req.PagingSizeBytes(); pagingSizeBytes > 0 { - consumption.RRU -= float64(kc.ReadBytesCost) * float64(pagingSizeBytes) + // baseCost + actualCost. Use the same basis (hint or PagingSizeBytes) + // that BeforeKVRequest used. + if bytesForEst := estimatedReadBytes(req); bytesForEst > 0 { + consumption.RRU -= float64(kc.ReadBytesCost) * float64(bytesForEst) } } else if !res.Succeed() { // If the write request is not successfully returned, we need to pay back the WRU cost. diff --git a/client/resource_group/controller/testutil.go b/client/resource_group/controller/testutil.go index ea67f035233..f2a6d9ab969 100644 --- a/client/resource_group/controller/testutil.go +++ b/client/resource_group/controller/testutil.go @@ -22,12 +22,13 @@ import "time" // TestRequestInfo is used to test the request info interface. type TestRequestInfo struct { - isWrite bool - writeBytes uint64 - numReplicas int64 - storeID uint64 - accessType AccessLocationType - pagingSizeBytes uint64 + isWrite bool + writeBytes uint64 + numReplicas int64 + storeID uint64 + accessType AccessLocationType + pagingSizeBytes uint64 + predictedReadBytes uint64 } // NewTestRequestInfo creates a new TestRequestInfo. @@ -76,6 +77,12 @@ func (tri *TestRequestInfo) PagingSizeBytes() uint64 { return tri.pagingSizeBytes } +// PredictedReadBytes implements the optional predictedReadBytesProvider +// interface so tests can exercise the EMA-based hint path. +func (tri *TestRequestInfo) PredictedReadBytes() uint64 { + return tri.predictedReadBytes +} + // TestResponseInfo is used to test the response info interface. type TestResponseInfo struct { readBytes uint64 From b053650749df702fb2743692cc406906118f1100 Mon Sep 17 00:00:00 2001 From: Yuhao Zhang Date: Tue, 14 Apr 2026 17:19:16 +0800 Subject: [PATCH 05/10] client/resource_group: add paging pre-charge observability metrics Adds four Prometheus metrics under namespace resource_manager_client to make the PredictedReadBytes-based paging pre-charge fix directly observable end-to-end: - paging_precharge_source_total{source=predicted|fallback} - paging_precharge_bytes_total{source} - paging_actual_bytes_total{source} - paging_prediction_residual_bytes (histogram, predicted path only) They are wired into onRequestWaitImpl / onResponseImpl so no change to the ResourceCalculator interface is needed. Labels are cached per group to keep the hot path allocation-free. Signed-off-by: Yuhao Zhang --- .../controller/group_controller.go | 63 ++++++++++++++++++ .../controller/metrics/metrics.go | 66 +++++++++++++++++++ 2 files changed, 129 insertions(+) diff --git a/client/resource_group/controller/group_controller.go b/client/resource_group/controller/group_controller.go index 8f3b2917465..3121807635a 100644 --- a/client/resource_group/controller/group_controller.go +++ b/client/resource_group/controller/group_controller.go @@ -106,6 +106,16 @@ type groupMetricsCollection struct { tokenRequestCounter prometheus.Counter runningKVRequestCounter prometheus.Gauge consumeTokenHistogram prometheus.Observer + + // Paging pre-charge observability: cached per-(RG, source) so the hot path + // avoids WithLabelValues on every KV request. + prechargeSourcePredicted prometheus.Counter + prechargeSourceFallback prometheus.Counter + prechargeBytesPredicted prometheus.Counter + prechargeBytesFallback prometheus.Counter + actualBytesPredicted prometheus.Counter + actualBytesFallback prometheus.Counter + predictionResidualBytes prometheus.Observer } func initMetrics(oldName, name string) *groupMetricsCollection { @@ -122,6 +132,53 @@ func initMetrics(oldName, name string) *groupMetricsCollection { tokenRequestCounter: metrics.ResourceGroupTokenRequestCounter.WithLabelValues(oldName, name), runningKVRequestCounter: metrics.GroupRunningKVRequestCounter.WithLabelValues(name), consumeTokenHistogram: metrics.TokenConsumedHistogram.WithLabelValues(name), + + prechargeSourcePredicted: metrics.PagingPrechargeSourceCounter.WithLabelValues(name, metrics.SourcePredicted), + prechargeSourceFallback: metrics.PagingPrechargeSourceCounter.WithLabelValues(name, metrics.SourceFallback), + prechargeBytesPredicted: metrics.PagingPrechargeBytesCounter.WithLabelValues(name, metrics.SourcePredicted), + prechargeBytesFallback: metrics.PagingPrechargeBytesCounter.WithLabelValues(name, metrics.SourceFallback), + actualBytesPredicted: metrics.PagingActualBytesCounter.WithLabelValues(name, metrics.SourcePredicted), + actualBytesFallback: metrics.PagingActualBytesCounter.WithLabelValues(name, metrics.SourceFallback), + predictionResidualBytes: metrics.PagingPredictionResidualBytes.WithLabelValues(name), + } +} + +// estimatePrechargeSource reports which source RC paging pre-charge will use +// for req and the byte basis it would charge. Mirrors estimatedReadBytes in +// model.go but also returns the source label so we can instrument per path. +// Returns ("", 0) when there is no paging pre-charge to observe. +func estimatePrechargeSource(req RequestInfo) (source string, bytesForEst uint64) { + if p, ok := req.(predictedReadBytesProvider); ok { + if hint := p.PredictedReadBytes(); hint > 0 { + return metrics.SourcePredicted, hint + } + } + if b := req.PagingSizeBytes(); b > 0 { + return metrics.SourceFallback, b + } + return "", 0 +} + +func (gmc *groupMetricsCollection) observePagingPrecharge(source string, bytesForEst uint64) { + switch source { + case metrics.SourcePredicted: + gmc.prechargeSourcePredicted.Inc() + gmc.prechargeBytesPredicted.Add(float64(bytesForEst)) + case metrics.SourceFallback: + gmc.prechargeSourceFallback.Inc() + gmc.prechargeBytesFallback.Add(float64(bytesForEst)) + } +} + +func (gmc *groupMetricsCollection) observePagingActual(source string, predicted, actual uint64) { + switch source { + case metrics.SourcePredicted: + gmc.actualBytesPredicted.Add(float64(actual)) + // Residual is only meaningful when the pre-charge used a learned hint; + // for fallback the "prediction" is just the paging budget. + gmc.predictionResidualBytes.Observe(float64(actual) - float64(predicted)) + case metrics.SourceFallback: + gmc.actualBytesFallback.Add(float64(actual)) } } @@ -551,6 +608,9 @@ func (gc *groupCostController) onRequestWaitImpl( for _, calc := range gc.calculators { calc.BeforeKVRequest(delta, info) } + if source, bytesForEst := estimatePrechargeSource(info); bytesForEst > 0 { + gc.metrics.observePagingPrecharge(source, bytesForEst) + } gc.mu.Lock() add(gc.mu.consumption, delta) @@ -601,6 +661,9 @@ func (gc *groupCostController) onResponseImpl( for _, calc := range gc.calculators { calc.AfterKVRequest(delta, req, resp) } + if source, bytesForEst := estimatePrechargeSource(req); bytesForEst > 0 { + gc.metrics.observePagingActual(source, bytesForEst, resp.ReadBytes()) + } if !gc.burstable.Load() { counter := gc.run.requestUnitTokens if v := getRUValueFromConsumption(delta); v > 0 { diff --git a/client/resource_group/controller/metrics/metrics.go b/client/resource_group/controller/metrics/metrics.go index 01231aceeaf..5e9670d3cbf 100644 --- a/client/resource_group/controller/metrics/metrics.go +++ b/client/resource_group/controller/metrics/metrics.go @@ -26,6 +26,15 @@ const ( newResourceGroupNameLabel = "resource_group" errType = "type" + + // sourceLabel distinguishes whether paging pre-charge used the learned + // PredictedReadBytes hint ("predicted") or fell back to PagingSizeBytes + // ("fallback"). Exported so call sites can pass the canonical value. + sourceLabel = "source" + // SourcePredicted is the sourceLabel value when pre-charge used a learned hint. + SourcePredicted = "predicted" + // SourceFallback is the sourceLabel value when pre-charge fell back to PagingSizeBytes. + SourceFallback = "fallback" ) var ( @@ -53,6 +62,20 @@ var ( FailedTokenRequestDuration prometheus.Observer // SuccessfulTokenRequestDuration comments placeholder, WithLabelValues is a heavy operation, define variable to avoid call it every time. SuccessfulTokenRequestDuration prometheus.Observer + + // PagingPrechargeSourceCounter counts paging pre-charge decisions by source + // (predicted hint vs PagingSizeBytes fallback), per resource group. + PagingPrechargeSourceCounter *prometheus.CounterVec + // PagingPrechargeBytesCounter sums bytes used as the pre-charge basis, + // partitioned by source so the predicted path can be isolated from fallback. + PagingPrechargeBytesCounter *prometheus.CounterVec + // PagingActualBytesCounter sums actual read bytes reported after the KV RPC, + // partitioned by the pre-charge source that was used. Ratio against + // PagingPrechargeBytesCounter reveals over-charge factor. + PagingActualBytesCounter *prometheus.CounterVec + // PagingPredictionResidualBytes observes (actual - predicted) bytes for + // requests that took the predicted path. Shows EMA prediction accuracy. + PagingPredictionResidualBytes *prometheus.HistogramVec ) func init() { @@ -156,6 +179,45 @@ func initMetrics(constLabels prometheus.Labels) { // WithLabelValues is a heavy operation, define variable to avoid call it every time. FailedTokenRequestDuration = TokenRequestDuration.WithLabelValues("fail") SuccessfulTokenRequestDuration = TokenRequestDuration.WithLabelValues("success") + + PagingPrechargeSourceCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: requestSubsystem, + Name: "paging_precharge_source_total", + Help: "Counter of paging pre-charge decisions, partitioned by whether a learned PredictedReadBytes hint was used or the PagingSizeBytes fallback.", + ConstLabels: constLabels, + }, []string{newResourceGroupNameLabel, sourceLabel}) + + PagingPrechargeBytesCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: requestSubsystem, + Name: "paging_precharge_bytes_total", + Help: "Sum of bytes used as the RC paging pre-charge basis, partitioned by source.", + ConstLabels: constLabels, + }, []string{newResourceGroupNameLabel, sourceLabel}) + + PagingActualBytesCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: requestSubsystem, + Name: "paging_actual_bytes_total", + Help: "Sum of actual bytes read by paging KV requests, labelled by the pre-charge source that was used for the same request.", + ConstLabels: constLabels, + }, []string{newResourceGroupNameLabel, sourceLabel}) + + PagingPredictionResidualBytes = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: requestSubsystem, + Name: "paging_prediction_residual_bytes", + // Signed residual = actual - predicted. Buckets cover both directions + // up to the typical paging budget (a few MB). + Buckets: []float64{-4194304, -1048576, -262144, -65536, -16384, -4096, 0, 4096, 16384, 65536, 262144, 1048576, 4194304}, + Help: "Histogram of (actual_read_bytes - predicted_read_bytes) for requests that used the predicted pre-charge path. Shows EMA prediction accuracy.", + ConstLabels: constLabels, + }, []string{newResourceGroupNameLabel}) } // InitAndRegisterMetrics initializes and register metrics. @@ -171,4 +233,8 @@ func InitAndRegisterMetrics(constLabels prometheus.Labels) { prometheus.MustRegister(ResourceGroupTokenRequestCounter) prometheus.MustRegister(LowTokenRequestNotifyCounter) prometheus.MustRegister(TokenConsumedHistogram) + prometheus.MustRegister(PagingPrechargeSourceCounter) + prometheus.MustRegister(PagingPrechargeBytesCounter) + prometheus.MustRegister(PagingActualBytesCounter) + prometheus.MustRegister(PagingPredictionResidualBytes) } From 91d43ce533b3449a1e31141876661ba6f00c2241 Mon Sep 17 00:00:00 2001 From: Yuhao Zhang Date: Tue, 14 Apr 2026 18:09:44 +0800 Subject: [PATCH 06/10] client/resource_group: instrument paging actual-bytes on wait path too The paging actual-bytes counter was only updated from onResponseImpl, but most responses under RC throttling flow through onResponseWaitImpl, so the counter stayed at 0 in practice and the over-charge ratio metric could not be computed. Mirror the observePagingActual call into the wait path. Signed-off-by: Yuhao Zhang --- client/resource_group/controller/group_controller.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/client/resource_group/controller/group_controller.go b/client/resource_group/controller/group_controller.go index 3121807635a..83e90b54217 100644 --- a/client/resource_group/controller/group_controller.go +++ b/client/resource_group/controller/group_controller.go @@ -697,6 +697,9 @@ func (gc *groupCostController) onResponseWaitImpl( for _, calc := range gc.calculators { calc.AfterKVRequest(delta, req, resp) } + if source, bytesForEst := estimatePrechargeSource(req); bytesForEst > 0 { + gc.metrics.observePagingActual(source, bytesForEst, resp.ReadBytes()) + } var waitDuration time.Duration if !gc.burstable.Load() { v := getRUValueFromConsumption(delta) From 35983e9b95674b45f9d336df6f49006e59498b66 Mon Sep 17 00:00:00 2001 From: Yuhao Zhang Date: Mon, 20 Apr 2026 13:27:46 +0800 Subject: [PATCH 07/10] resource_control: drop PagingSizeBytes fallback for RC paging pre-charge Pre-charge now requires a learned PredictedReadBytes hint. Without a hint (EMA cold start or unhinted caller) the request is not pre-charged and is billed at Phase 2 by actual read bytes only. PagingSizeBytes is no longer consulted for pre-charge so the protocol-level paging cap and the RU-billing pre-charge are fully decoupled. Motivation: Round 2 data showed the fallback path over-charged 4-268x because PagingSizeBytes is a worst-case cap, not an expectation. Under concurrency this produced large artificial Phase 1 throttling, pushing QPS/latency into tier-dependent non-linear regressions. The cold window loses Phase 1 pre-throttling with this change; Phase 2 token-bucket billing still enforces the quota. Rewrote the model tests to reflect the new semantics: - TestPredictedReadBytesPreCharge asserts hint-driven pre-charge is unaffected by PagingSizeBytes. - TestNoPreChargeWithoutPredictedReadBytes asserts PagingSizeBytes alone no longer triggers pre-charge. - Existing refund/settlement tests now drive pre-charge via PredictedReadBytes instead of PagingSizeBytes. Signed-off-by: Yuhao Zhang --- .../controller/group_controller_test.go | 147 +++++++----------- client/resource_group/controller/model.go | 37 +++-- 2 files changed, 78 insertions(+), 106 deletions(-) diff --git a/client/resource_group/controller/group_controller_test.go b/client/resource_group/controller/group_controller_test.go index fcf8cad5177..729f3c39b57 100644 --- a/client/resource_group/controller/group_controller_test.go +++ b/client/resource_group/controller/group_controller_test.go @@ -208,112 +208,84 @@ func TestOnResponseWaitConsumption(t *testing.T) { verify() } -func TestPagingSizeBytesPreCharge(t *testing.T) { +func TestPredictedReadBytesPreCharge(t *testing.T) { re := require.New(t) cfg := DefaultRUConfig() kvCalc := newKVCalculator(cfg) - // Phase 1: BeforeKVRequest with pagingSizeBytes should pre-charge - // baseCost + pagingSizeBytes * ReadBytesCost. - pagingSizeBytes := uint64(4 * 1024 * 1024) // 4 MB + // Phase 1: BeforeKVRequest with a PredictedReadBytes hint should + // pre-charge baseCost + predictedReadBytes * ReadBytesCost, regardless + // of whether PagingSizeBytes is set. The two are decoupled. + pagingSizeBytes := uint64(4 * 1024 * 1024) // protocol-level cap only + predictedReadBytes := uint64(256 * 1024) // learned EMA estimate req := &TestRequestInfo{ - isWrite: false, - pagingSizeBytes: pagingSizeBytes, + isWrite: false, + pagingSizeBytes: pagingSizeBytes, + predictedReadBytes: predictedReadBytes, } phase1 := &rmpb.Consumption{} kvCalc.BeforeKVRequest(phase1, req) - baseCost := float64(cfg.ReadBaseCost) + float64(cfg.ReadPerBatchBaseCost)*0.7 - bytesCost := float64(cfg.ReadBytesCost) * float64(pagingSizeBytes) - re.InDelta(baseCost+bytesCost, phase1.RRU, 1e-6, - "Phase 1 should pre-charge baseCost + bytes RU") + baseCost := float64(cfg.ReadBaseCost) + float64(cfg.ReadPerBatchBaseCost)*defaultAvgBatchProportion + hintCost := float64(cfg.ReadBytesCost) * float64(predictedReadBytes) + re.InDelta(baseCost+hintCost, phase1.RRU, 1e-6, + "Phase 1 should pre-charge based on PredictedReadBytes") - // Phase 2: AfterKVRequest should subtract the pre-charged bytes RU. + // Phase 2 should subtract the same hint basis, preserving + // Phase 1 + Phase 2 == baseCost + actualCost. + actualReadBytes := uint64(300 * 1024) // close to the prediction resp := &TestResponseInfo{ - readBytes: 2 * 1024 * 1024, // actual read 2 MB + readBytes: actualReadBytes, kvCPU: 10 * time.Millisecond, succeed: true, } phase2 := &rmpb.Consumption{} kvCalc.AfterKVRequest(phase2, req, resp) - actualReadCost := float64(cfg.ReadBytesCost) * float64(resp.readBytes) + actualReadCost := float64(cfg.ReadBytesCost) * float64(actualReadBytes) cpuCost := float64(cfg.CPUMsCost) * 10.0 - expectedPhase2RRU := actualReadCost + cpuCost - bytesCost - re.InDelta(expectedPhase2RRU, phase2.RRU, 1e-6, - "Phase 2 should be actualCost - preCharged bytesCost") + re.InDelta(actualReadCost+cpuCost-hintCost, phase2.RRU, 1e-6, + "Phase 2 should settle using the same hint basis as Phase 1") - // Net total should equal baseCost + actualCost (no double-counting). totalRRU := phase1.RRU + phase2.RRU - expectedTotal := baseCost + actualReadCost + cpuCost - re.InDelta(expectedTotal, totalRRU, 1e-6, + re.InDelta(baseCost+actualReadCost+cpuCost, totalRRU, 1e-6, "Total RRU across Phase 1+2 should equal baseCost + actualCost") - - // Without pagingSizeBytes, Phase 1 should only charge baseCost. - reqNoPaging := &TestRequestInfo{isWrite: false} - noPaging := &rmpb.Consumption{} - kvCalc.BeforeKVRequest(noPaging, reqNoPaging) - re.InDelta(baseCost, noPaging.RRU, 1e-6, - "Without paging, Phase 1 should only charge baseCost") } -func TestPredictedReadBytesOverridesPagingSizeBytes(t *testing.T) { +func TestNoPreChargeWithoutPredictedReadBytes(t *testing.T) { re := require.New(t) cfg := DefaultRUConfig() kvCalc := newKVCalculator(cfg) + baseCost := float64(cfg.ReadBaseCost) + float64(cfg.ReadPerBatchBaseCost)*defaultAvgBatchProportion - // When PredictedReadBytes (the EMA hint) is > 0, it should override - // PagingSizeBytes as the pre-charge basis. PagingSizeBytes is kept as a - // safety cap and worst-case fallback; the hint gives a tighter estimate. - pagingSizeBytes := uint64(4 * 1024 * 1024) // 4 MB worst-case cap - predictedReadBytes := uint64(256 * 1024) // 256 KB learned estimate - req := &TestRequestInfo{ - isWrite: false, - pagingSizeBytes: pagingSizeBytes, - predictedReadBytes: predictedReadBytes, + // With PagingSizeBytes but no PredictedReadBytes hint, Phase 1 must + // NOT pre-charge: the two quantities are decoupled. Phase 2 bills the + // actual read bytes only. + pagingSizeBytes := uint64(4 * 1024 * 1024) + reqPagingOnly := &TestRequestInfo{ + isWrite: false, + pagingSizeBytes: pagingSizeBytes, } - phase1 := &rmpb.Consumption{} - kvCalc.BeforeKVRequest(phase1, req) - - baseCost := float64(cfg.ReadBaseCost) + float64(cfg.ReadPerBatchBaseCost)*defaultAvgBatchProportion - hintCost := float64(cfg.ReadBytesCost) * float64(predictedReadBytes) - re.InDelta(baseCost+hintCost, phase1.RRU, 1e-6, - "Phase 1 should pre-charge based on PredictedReadBytes, not PagingSizeBytes") + kvCalc.BeforeKVRequest(phase1, reqPagingOnly) + re.InDelta(baseCost, phase1.RRU, 1e-6, + "PagingSizeBytes alone must not trigger pre-charge") - // Phase 2 should subtract the same hint-based basis, preserving the - // invariant that Phase 1 + Phase 2 == baseCost + actualCost. - actualReadBytes := uint64(300 * 1024) // close to the prediction + actualReadBytes := uint64(2 * 1024 * 1024) resp := &TestResponseInfo{ readBytes: actualReadBytes, - kvCPU: 0, + kvCPU: 10 * time.Millisecond, succeed: true, } phase2 := &rmpb.Consumption{} - kvCalc.AfterKVRequest(phase2, req, resp) + kvCalc.AfterKVRequest(phase2, reqPagingOnly, resp) actualReadCost := float64(cfg.ReadBytesCost) * float64(actualReadBytes) - expectedPhase2RRU := actualReadCost - hintCost - re.InDelta(expectedPhase2RRU, phase2.RRU, 1e-6, - "Phase 2 should settle using the same hint basis as Phase 1") - - totalRRU := phase1.RRU + phase2.RRU - re.InDelta(baseCost+actualReadCost, totalRRU, 1e-6, - "Total RRU across Phase 1+2 should still equal baseCost + actualCost") - - // When the hint is zero, fall back to PagingSizeBytes (old behavior). - reqFallback := &TestRequestInfo{ - isWrite: false, - pagingSizeBytes: pagingSizeBytes, - // predictedReadBytes left at 0 - } - phase1Fallback := &rmpb.Consumption{} - kvCalc.BeforeKVRequest(phase1Fallback, reqFallback) - fallbackBytesCost := float64(cfg.ReadBytesCost) * float64(pagingSizeBytes) - re.InDelta(baseCost+fallbackBytesCost, phase1Fallback.RRU, 1e-6, - "With no hint, pre-charge should fall back to PagingSizeBytes") + cpuCost := float64(cfg.CPUMsCost) * 10.0 + re.InDelta(actualReadCost+cpuCost, phase2.RRU, 1e-6, + "Phase 2 should bill actual read cost only when nothing was pre-charged") - // When both are zero, no byte-based pre-charge is applied. + // Bare request without any hint or paging also pre-charges nothing. reqNone := &TestRequestInfo{isWrite: false} phase1None := &rmpb.Consumption{} kvCalc.BeforeKVRequest(phase1None, reqNone) @@ -333,12 +305,12 @@ func TestPagingPreChargeTokenRefund(t *testing.T) { newBurst: 0, }) - pagingSizeBytes := uint64(4 * 1024 * 1024) // 4 MB pre-charge - actualReadBytes := uint64(1 * 1024 * 1024) // 1 MB actual + predictedReadBytes := uint64(4 * 1024 * 1024) // 4 MB pre-charge + actualReadBytes := uint64(1 * 1024 * 1024) // 1 MB actual req := &TestRequestInfo{ - isWrite: false, - pagingSizeBytes: pagingSizeBytes, + isWrite: false, + predictedReadBytes: predictedReadBytes, } resp := &TestResponseInfo{ readBytes: actualReadBytes, @@ -358,7 +330,7 @@ func TestPagingPreChargeTokenRefund(t *testing.T) { // The limiter should have more tokens after settlement than after pre-charge, // because the refund (pre-charge - actual) exceeds the actual read cost. cfg := DefaultRUConfig() - preChargeCost := float64(cfg.ReadBytesCost) * float64(pagingSizeBytes) + preChargeCost := float64(cfg.ReadBytesCost) * float64(predictedReadBytes) actualCost := float64(cfg.ReadBytesCost) * float64(actualReadBytes) expectedRefund := preChargeCost - actualCost re.Positive(expectedRefund, "sanity: pre-charge should exceed actual cost") @@ -387,12 +359,12 @@ func TestPagingPreChargeNoRefundWhenActualExceedsEstimate(t *testing.T) { newBurst: 0, }) - pagingSizeBytes := uint64(1 * 1024 * 1024) // 1 MB pre-charge - actualReadBytes := uint64(4 * 1024 * 1024) // 4 MB actual (exceeds estimate) + predictedReadBytes := uint64(1 * 1024 * 1024) // 1 MB pre-charge + actualReadBytes := uint64(4 * 1024 * 1024) // 4 MB actual (exceeds estimate) req := &TestRequestInfo{ - isWrite: false, - pagingSizeBytes: pagingSizeBytes, + isWrite: false, + predictedReadBytes: predictedReadBytes, } resp := &TestResponseInfo{ readBytes: actualReadBytes, @@ -423,12 +395,12 @@ func TestOnResponseImplPagingRefund(t *testing.T) { newBurst: 0, }) - pagingSizeBytes := uint64(4 * 1024 * 1024) // 4 MB pre-charge - actualReadBytes := uint64(512 * 1024) // 512 KB actual + predictedReadBytes := uint64(4 * 1024 * 1024) // 4 MB pre-charge + actualReadBytes := uint64(512 * 1024) // 512 KB actual req := &TestRequestInfo{ - isWrite: false, - pagingSizeBytes: pagingSizeBytes, + isWrite: false, + predictedReadBytes: predictedReadBytes, } resp := &TestResponseInfo{ readBytes: actualReadBytes, @@ -461,15 +433,16 @@ func TestPagingPreChargeZeroDelta(t *testing.T) { newBurst: 0, }) - // Set actual read bytes equal to pagingSizeBytes so the read-bytes delta is zero. - // The only settlement cost should be CPU time (which we set to zero here). - pagingSizeBytes := uint64(2 * 1024 * 1024) + // Set actual read bytes equal to the predicted bytes so the read-bytes + // delta is zero. The only settlement cost should be CPU time (which + // we set to zero here). + predictedReadBytes := uint64(2 * 1024 * 1024) req := &TestRequestInfo{ - isWrite: false, - pagingSizeBytes: pagingSizeBytes, + isWrite: false, + predictedReadBytes: predictedReadBytes, } resp := &TestResponseInfo{ - readBytes: pagingSizeBytes, // exact match + readBytes: predictedReadBytes, // exact match succeed: true, } diff --git a/client/resource_group/controller/model.go b/client/resource_group/controller/model.go index 5c49f91aedc..0d37576809a 100644 --- a/client/resource_group/controller/model.go +++ b/client/resource_group/controller/model.go @@ -60,27 +60,26 @@ type RequestInfo interface { // predictedReadBytesProvider is an optional interface that a RequestInfo // implementation may satisfy to supply a learned estimate (e.g., from a // per-logical-scan EMA maintained in TiDB) of how many bytes the request -// will read. When present and > 0, it overrides PagingSizeBytes as the -// byte basis for RC paging pre-charge. +// will read. When present and > 0, it is used as the byte basis for RC +// paging pre-charge; when absent or zero the request is not pre-charged +// and will be billed at Phase 2 by actual read bytes only. // // Defined as an optional interface (not a method on RequestInfo) so older -// RequestInfo implementations that have not been updated still compile and -// behave as before (falling back to PagingSizeBytes). +// RequestInfo implementations that have not been updated still compile; +// they simply skip pre-charge. type predictedReadBytesProvider interface { PredictedReadBytes() uint64 } -// estimatedReadBytes returns the byte basis used for RC paging pre-charge. -// It prefers a learned PredictedReadBytes hint when the RequestInfo -// implements the optional provider and returns a non-zero value; otherwise -// it falls back to PagingSizeBytes (the paging byte budget / worst-case cap). +// estimatedReadBytes returns the byte basis used for RC paging pre-charge, +// or 0 to skip pre-charge entirely. Only a learned PredictedReadBytes hint +// is honored: PagingSizeBytes is intentionally ignored here so that the +// protocol-level paging cap and the RU-billing pre-charge stay decoupled. func estimatedReadBytes(req RequestInfo) uint64 { if p, ok := req.(predictedReadBytesProvider); ok { - if hint := p.PredictedReadBytes(); hint > 0 { - return hint - } + return p.PredictedReadBytes() } - return req.PagingSizeBytes() + return 0 } // ResponseInfo is the interface of the response information provider. A response should be @@ -131,11 +130,11 @@ func (kc *KVCalculator) BeforeKVRequest(consumption *rmpb.Consumption, req Reque } else { consumption.KvReadRpcCount += 1 consumption.RRU += float64(kc.ReadBaseCost) + float64(kc.ReadPerBatchBaseCost)*defaultAvgBatchProportion - // RC Paging pre-charge: if the request has a byte budget, pre-charge - // the estimated read bytes RU so that concurrent workers are throttled - // at Phase 1 instead of all hitting Phase 2 at once. Prefer a learned - // PredictedReadBytes hint when the caller supplies one; otherwise fall - // back to PagingSizeBytes (the paging byte budget / worst-case cap). + // RC Paging pre-charge: pre-charge the learned read-bytes RU so that + // concurrent workers are throttled at Phase 1 instead of all hitting + // Phase 2 at once. Only applies when the caller supplies a + // PredictedReadBytes hint; without it the request is billed at + // Phase 2 by actual read bytes only. if bytesForEst := estimatedReadBytes(req); bytesForEst > 0 { consumption.RRU += float64(kc.ReadBytesCost) * float64(bytesForEst) } @@ -175,8 +174,8 @@ func (kc *KVCalculator) AfterKVRequest(consumption *rmpb.Consumption, req Reques kc.calculateCPUCost(consumption, res) // RC Paging settlement: subtract the pre-charged bytes RU added in // BeforeKVRequest so the net total (Phase 1 + Phase 2) equals - // baseCost + actualCost. Use the same basis (hint or PagingSizeBytes) - // that BeforeKVRequest used. + // baseCost + actualCost. Symmetric with BeforeKVRequest: when no + // hint was supplied nothing was pre-charged, so nothing is refunded. if bytesForEst := estimatedReadBytes(req); bytesForEst > 0 { consumption.RRU -= float64(kc.ReadBytesCost) * float64(bytesForEst) } From be76d48a4027ed63df95023bcf3f6ab1033ea984 Mon Sep 17 00:00:00 2001 From: Yuhao Zhang Date: Mon, 20 Apr 2026 13:27:55 +0800 Subject: [PATCH 08/10] client/resource_group: drop fallback bucket from paging pre-charge metrics Follow-up to removing the PagingSizeBytes fallback: the source label on paging pre-charge metrics had only two values (predicted / fallback) and fallback is no longer reachable. Drop the label dimension entirely and rename PagingPrechargeSourceCounter to PagingPrechargeCounter; the remaining three counters and one histogram now measure pre-charged requests only (those with a PredictedReadBytes hint > 0). Inlined estimatePrechargeSource at its three call sites by calling estimatedReadBytes directly, and simplified observePagingPrecharge / observePagingActual to their single remaining case. Signed-off-by: Yuhao Zhang --- .../controller/group_controller.go | 85 ++++++------------- .../controller/metrics/metrics.go | 47 +++++----- 2 files changed, 47 insertions(+), 85 deletions(-) diff --git a/client/resource_group/controller/group_controller.go b/client/resource_group/controller/group_controller.go index 83e90b54217..8cb00248339 100644 --- a/client/resource_group/controller/group_controller.go +++ b/client/resource_group/controller/group_controller.go @@ -107,15 +107,13 @@ type groupMetricsCollection struct { runningKVRequestCounter prometheus.Gauge consumeTokenHistogram prometheus.Observer - // Paging pre-charge observability: cached per-(RG, source) so the hot path - // avoids WithLabelValues on every KV request. - prechargeSourcePredicted prometheus.Counter - prechargeSourceFallback prometheus.Counter - prechargeBytesPredicted prometheus.Counter - prechargeBytesFallback prometheus.Counter - actualBytesPredicted prometheus.Counter - actualBytesFallback prometheus.Counter - predictionResidualBytes prometheus.Observer + // Paging pre-charge observability: cached per-RG so the hot path avoids + // WithLabelValues on every KV request. Only pre-charged requests (those + // with a PredictedReadBytes hint > 0) contribute to these metrics. + prechargeCounter prometheus.Counter + prechargeBytesCounter prometheus.Counter + actualBytesCounter prometheus.Counter + predictionResidualBytes prometheus.Observer } func initMetrics(oldName, name string) *groupMetricsCollection { @@ -133,53 +131,26 @@ func initMetrics(oldName, name string) *groupMetricsCollection { runningKVRequestCounter: metrics.GroupRunningKVRequestCounter.WithLabelValues(name), consumeTokenHistogram: metrics.TokenConsumedHistogram.WithLabelValues(name), - prechargeSourcePredicted: metrics.PagingPrechargeSourceCounter.WithLabelValues(name, metrics.SourcePredicted), - prechargeSourceFallback: metrics.PagingPrechargeSourceCounter.WithLabelValues(name, metrics.SourceFallback), - prechargeBytesPredicted: metrics.PagingPrechargeBytesCounter.WithLabelValues(name, metrics.SourcePredicted), - prechargeBytesFallback: metrics.PagingPrechargeBytesCounter.WithLabelValues(name, metrics.SourceFallback), - actualBytesPredicted: metrics.PagingActualBytesCounter.WithLabelValues(name, metrics.SourcePredicted), - actualBytesFallback: metrics.PagingActualBytesCounter.WithLabelValues(name, metrics.SourceFallback), - predictionResidualBytes: metrics.PagingPredictionResidualBytes.WithLabelValues(name), + prechargeCounter: metrics.PagingPrechargeCounter.WithLabelValues(name), + prechargeBytesCounter: metrics.PagingPrechargeBytesCounter.WithLabelValues(name), + actualBytesCounter: metrics.PagingActualBytesCounter.WithLabelValues(name), + predictionResidualBytes: metrics.PagingPredictionResidualBytes.WithLabelValues(name), } } -// estimatePrechargeSource reports which source RC paging pre-charge will use -// for req and the byte basis it would charge. Mirrors estimatedReadBytes in -// model.go but also returns the source label so we can instrument per path. -// Returns ("", 0) when there is no paging pre-charge to observe. -func estimatePrechargeSource(req RequestInfo) (source string, bytesForEst uint64) { - if p, ok := req.(predictedReadBytesProvider); ok { - if hint := p.PredictedReadBytes(); hint > 0 { - return metrics.SourcePredicted, hint - } - } - if b := req.PagingSizeBytes(); b > 0 { - return metrics.SourceFallback, b - } - return "", 0 +// observePagingPrecharge records one pre-charge event. Caller must guarantee +// bytesForEst > 0; cold-start requests with no hint are not pre-charged and +// should not be observed here. +func (gmc *groupMetricsCollection) observePagingPrecharge(bytesForEst uint64) { + gmc.prechargeCounter.Inc() + gmc.prechargeBytesCounter.Add(float64(bytesForEst)) } -func (gmc *groupMetricsCollection) observePagingPrecharge(source string, bytesForEst uint64) { - switch source { - case metrics.SourcePredicted: - gmc.prechargeSourcePredicted.Inc() - gmc.prechargeBytesPredicted.Add(float64(bytesForEst)) - case metrics.SourceFallback: - gmc.prechargeSourceFallback.Inc() - gmc.prechargeBytesFallback.Add(float64(bytesForEst)) - } -} - -func (gmc *groupMetricsCollection) observePagingActual(source string, predicted, actual uint64) { - switch source { - case metrics.SourcePredicted: - gmc.actualBytesPredicted.Add(float64(actual)) - // Residual is only meaningful when the pre-charge used a learned hint; - // for fallback the "prediction" is just the paging budget. - gmc.predictionResidualBytes.Observe(float64(actual) - float64(predicted)) - case metrics.SourceFallback: - gmc.actualBytesFallback.Add(float64(actual)) - } +// observePagingActual records actual read bytes and the signed residual for a +// pre-charged request. Caller must guarantee predicted > 0. +func (gmc *groupMetricsCollection) observePagingActual(predicted, actual uint64) { + gmc.actualBytesCounter.Add(float64(actual)) + gmc.predictionResidualBytes.Observe(float64(actual) - float64(predicted)) } type tokenCounter struct { @@ -608,8 +579,8 @@ func (gc *groupCostController) onRequestWaitImpl( for _, calc := range gc.calculators { calc.BeforeKVRequest(delta, info) } - if source, bytesForEst := estimatePrechargeSource(info); bytesForEst > 0 { - gc.metrics.observePagingPrecharge(source, bytesForEst) + if bytesForEst := estimatedReadBytes(info); bytesForEst > 0 { + gc.metrics.observePagingPrecharge(bytesForEst) } gc.mu.Lock() @@ -661,8 +632,8 @@ func (gc *groupCostController) onResponseImpl( for _, calc := range gc.calculators { calc.AfterKVRequest(delta, req, resp) } - if source, bytesForEst := estimatePrechargeSource(req); bytesForEst > 0 { - gc.metrics.observePagingActual(source, bytesForEst, resp.ReadBytes()) + if bytesForEst := estimatedReadBytes(req); bytesForEst > 0 { + gc.metrics.observePagingActual(bytesForEst, resp.ReadBytes()) } if !gc.burstable.Load() { counter := gc.run.requestUnitTokens @@ -697,8 +668,8 @@ func (gc *groupCostController) onResponseWaitImpl( for _, calc := range gc.calculators { calc.AfterKVRequest(delta, req, resp) } - if source, bytesForEst := estimatePrechargeSource(req); bytesForEst > 0 { - gc.metrics.observePagingActual(source, bytesForEst, resp.ReadBytes()) + if bytesForEst := estimatedReadBytes(req); bytesForEst > 0 { + gc.metrics.observePagingActual(bytesForEst, resp.ReadBytes()) } var waitDuration time.Duration if !gc.burstable.Load() { diff --git a/client/resource_group/controller/metrics/metrics.go b/client/resource_group/controller/metrics/metrics.go index 5e9670d3cbf..4e9e942fe8a 100644 --- a/client/resource_group/controller/metrics/metrics.go +++ b/client/resource_group/controller/metrics/metrics.go @@ -26,15 +26,6 @@ const ( newResourceGroupNameLabel = "resource_group" errType = "type" - - // sourceLabel distinguishes whether paging pre-charge used the learned - // PredictedReadBytes hint ("predicted") or fell back to PagingSizeBytes - // ("fallback"). Exported so call sites can pass the canonical value. - sourceLabel = "source" - // SourcePredicted is the sourceLabel value when pre-charge used a learned hint. - SourcePredicted = "predicted" - // SourceFallback is the sourceLabel value when pre-charge fell back to PagingSizeBytes. - SourceFallback = "fallback" ) var ( @@ -63,18 +54,18 @@ var ( // SuccessfulTokenRequestDuration comments placeholder, WithLabelValues is a heavy operation, define variable to avoid call it every time. SuccessfulTokenRequestDuration prometheus.Observer - // PagingPrechargeSourceCounter counts paging pre-charge decisions by source - // (predicted hint vs PagingSizeBytes fallback), per resource group. - PagingPrechargeSourceCounter *prometheus.CounterVec - // PagingPrechargeBytesCounter sums bytes used as the pre-charge basis, - // partitioned by source so the predicted path can be isolated from fallback. + // PagingPrechargeCounter counts paging pre-charge events (predicted hint + // present and > 0), per resource group. Cold starts and unhinted requests + // are not pre-charged and therefore not counted here. + PagingPrechargeCounter *prometheus.CounterVec + // PagingPrechargeBytesCounter sums bytes used as the pre-charge basis. PagingPrechargeBytesCounter *prometheus.CounterVec - // PagingActualBytesCounter sums actual read bytes reported after the KV RPC, - // partitioned by the pre-charge source that was used. Ratio against - // PagingPrechargeBytesCounter reveals over-charge factor. + // PagingActualBytesCounter sums actual read bytes reported after the KV + // RPC for pre-charged requests. Ratio against PagingPrechargeBytesCounter + // reveals over/under-charge factor. PagingActualBytesCounter *prometheus.CounterVec // PagingPredictionResidualBytes observes (actual - predicted) bytes for - // requests that took the predicted path. Shows EMA prediction accuracy. + // pre-charged requests. Shows EMA prediction accuracy. PagingPredictionResidualBytes *prometheus.HistogramVec ) @@ -180,32 +171,32 @@ func initMetrics(constLabels prometheus.Labels) { FailedTokenRequestDuration = TokenRequestDuration.WithLabelValues("fail") SuccessfulTokenRequestDuration = TokenRequestDuration.WithLabelValues("success") - PagingPrechargeSourceCounter = prometheus.NewCounterVec( + PagingPrechargeCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: namespace, Subsystem: requestSubsystem, - Name: "paging_precharge_source_total", - Help: "Counter of paging pre-charge decisions, partitioned by whether a learned PredictedReadBytes hint was used or the PagingSizeBytes fallback.", + Name: "paging_precharge_total", + Help: "Counter of RC paging pre-charge events (PredictedReadBytes hint present and > 0).", ConstLabels: constLabels, - }, []string{newResourceGroupNameLabel, sourceLabel}) + }, []string{newResourceGroupNameLabel}) PagingPrechargeBytesCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: namespace, Subsystem: requestSubsystem, Name: "paging_precharge_bytes_total", - Help: "Sum of bytes used as the RC paging pre-charge basis, partitioned by source.", + Help: "Sum of bytes used as the RC paging pre-charge basis (predicted hint).", ConstLabels: constLabels, - }, []string{newResourceGroupNameLabel, sourceLabel}) + }, []string{newResourceGroupNameLabel}) PagingActualBytesCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: namespace, Subsystem: requestSubsystem, Name: "paging_actual_bytes_total", - Help: "Sum of actual bytes read by paging KV requests, labelled by the pre-charge source that was used for the same request.", + Help: "Sum of actual bytes read by paging KV requests that were pre-charged.", ConstLabels: constLabels, - }, []string{newResourceGroupNameLabel, sourceLabel}) + }, []string{newResourceGroupNameLabel}) PagingPredictionResidualBytes = prometheus.NewHistogramVec( prometheus.HistogramOpts{ @@ -215,7 +206,7 @@ func initMetrics(constLabels prometheus.Labels) { // Signed residual = actual - predicted. Buckets cover both directions // up to the typical paging budget (a few MB). Buckets: []float64{-4194304, -1048576, -262144, -65536, -16384, -4096, 0, 4096, 16384, 65536, 262144, 1048576, 4194304}, - Help: "Histogram of (actual_read_bytes - predicted_read_bytes) for requests that used the predicted pre-charge path. Shows EMA prediction accuracy.", + Help: "Histogram of (actual_read_bytes - predicted_read_bytes) for pre-charged requests. Shows EMA prediction accuracy.", ConstLabels: constLabels, }, []string{newResourceGroupNameLabel}) } @@ -233,7 +224,7 @@ func InitAndRegisterMetrics(constLabels prometheus.Labels) { prometheus.MustRegister(ResourceGroupTokenRequestCounter) prometheus.MustRegister(LowTokenRequestNotifyCounter) prometheus.MustRegister(TokenConsumedHistogram) - prometheus.MustRegister(PagingPrechargeSourceCounter) + prometheus.MustRegister(PagingPrechargeCounter) prometheus.MustRegister(PagingPrechargeBytesCounter) prometheus.MustRegister(PagingActualBytesCounter) prometheus.MustRegister(PagingPredictionResidualBytes) From 845da8e9b72c12bb95fa6a06573a1a2b53cd1e14 Mon Sep 17 00:00:00 2001 From: Yuhao Zhang Date: Mon, 20 Apr 2026 13:57:39 +0800 Subject: [PATCH 09/10] client/resource_group: add nonprecharge metrics for cold-path RPCs Record RPCs that implement the predicted-bytes hint interface but report zero (EMA cold-start or feature-disabled) and therefore skip Phase 1 pre-charge. Two new counters tagged by resource group: - paging_nonprecharge_total: count of bypassed RPCs - paging_nonprecharge_actual_bytes_total: actual bytes read by them Observed from the Phase 2 settlement path alongside the existing pre-charge actual-bytes metric, gated on non-write requests. Signed-off-by: Yuhao Zhang --- client/go.mod | 1 + client/go.sum | 2 + .../controller/group_controller.go | 27 +++++++++ .../controller/group_controller_test.go | 55 +++++++++++++++++++ .../controller/metrics/metrics.go | 31 +++++++++++ 5 files changed, 116 insertions(+) diff --git a/client/go.mod b/client/go.mod index 205e08decc3..9bc83b7043f 100644 --- a/client/go.mod +++ b/client/go.mod @@ -26,6 +26,7 @@ require ( github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/golang/protobuf v1.5.4 // indirect + github.com/kylelemons/godebug v1.1.0 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/client/go.sum b/client/go.sum index c9c66d687db..36f603d7aa9 100644 --- a/client/go.sum +++ b/client/go.sum @@ -38,6 +38,8 @@ github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3x github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= diff --git a/client/resource_group/controller/group_controller.go b/client/resource_group/controller/group_controller.go index 8cb00248339..de5fcc35e81 100644 --- a/client/resource_group/controller/group_controller.go +++ b/client/resource_group/controller/group_controller.go @@ -114,6 +114,12 @@ type groupMetricsCollection struct { prechargeBytesCounter prometheus.Counter actualBytesCounter prometheus.Counter predictionResidualBytes prometheus.Observer + + // Nonprecharge bucket: RPCs that implemented the predicted-bytes + // interface but reported 0 (EMA cold-start or feature-disabled) and + // therefore skipped Phase 1 pre-charge entirely. Recorded at Phase 2. + nonprechargeCounter prometheus.Counter + nonprechargeActualBytes prometheus.Counter } func initMetrics(oldName, name string) *groupMetricsCollection { @@ -135,6 +141,9 @@ func initMetrics(oldName, name string) *groupMetricsCollection { prechargeBytesCounter: metrics.PagingPrechargeBytesCounter.WithLabelValues(name), actualBytesCounter: metrics.PagingActualBytesCounter.WithLabelValues(name), predictionResidualBytes: metrics.PagingPredictionResidualBytes.WithLabelValues(name), + + nonprechargeCounter: metrics.PagingNonprechargeCounter.WithLabelValues(name), + nonprechargeActualBytes: metrics.PagingNonprechargeActualBytes.WithLabelValues(name), } } @@ -153,6 +162,16 @@ func (gmc *groupMetricsCollection) observePagingActual(predicted, actual uint64) gmc.predictionResidualBytes.Observe(float64(actual) - float64(predicted)) } +// observePagingNonprecharge records one RPC that implemented the predicted +// read-bytes interface but reported 0 (EMA cold or feature disabled) and +// therefore ran without Phase 1 pre-charge. `actual` is the response's read +// bytes, settled at Phase 2. Paired with observePagingPrecharge this gives +// the cold/ready split and the byte volume that bypassed throttling. +func (gmc *groupMetricsCollection) observePagingNonprecharge(actual uint64) { + gmc.nonprechargeCounter.Inc() + gmc.nonprechargeActualBytes.Add(float64(actual)) +} + type tokenCounter struct { fillRate uint64 @@ -634,6 +653,10 @@ func (gc *groupCostController) onResponseImpl( } if bytesForEst := estimatedReadBytes(req); bytesForEst > 0 { gc.metrics.observePagingActual(bytesForEst, resp.ReadBytes()) + } else if !req.IsWrite() { + if _, ok := req.(predictedReadBytesProvider); ok { + gc.metrics.observePagingNonprecharge(resp.ReadBytes()) + } } if !gc.burstable.Load() { counter := gc.run.requestUnitTokens @@ -670,6 +693,10 @@ func (gc *groupCostController) onResponseWaitImpl( } if bytesForEst := estimatedReadBytes(req); bytesForEst > 0 { gc.metrics.observePagingActual(bytesForEst, resp.ReadBytes()) + } else if !req.IsWrite() { + if _, ok := req.(predictedReadBytesProvider); ok { + gc.metrics.observePagingNonprecharge(resp.ReadBytes()) + } } var waitDuration time.Duration if !gc.burstable.Load() { diff --git a/client/resource_group/controller/group_controller_test.go b/client/resource_group/controller/group_controller_test.go index 729f3c39b57..52b4a5fc224 100644 --- a/client/resource_group/controller/group_controller_test.go +++ b/client/resource_group/controller/group_controller_test.go @@ -20,11 +20,13 @@ import ( "testing" "time" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/require" rmpb "github.com/pingcap/kvproto/pkg/resource_manager" "github.com/tikv/pd/client/errs" + "github.com/tikv/pd/client/resource_group/controller/metrics" ) func createTestGroupCostController(re *require.Assertions) *groupCostController { @@ -422,6 +424,59 @@ func TestOnResponseImplPagingRefund(t *testing.T) { "onResponseImpl should refund excess pre-charged tokens") } +func TestNonprechargeMetricsRecordedWhenHintZero(t *testing.T) { + re := require.New(t) + gc := createTestGroupCostController(re) + + nonprechargeCounter := metrics.PagingNonprechargeCounter.WithLabelValues(gc.name) + nonprechargeBytes := metrics.PagingNonprechargeActualBytes.WithLabelValues(gc.name) + prechargeCounter := metrics.PagingPrechargeCounter.WithLabelValues(gc.name) + actualBytes := metrics.PagingActualBytesCounter.WithLabelValues(gc.name) + + countBefore := testutil.ToFloat64(nonprechargeCounter) + bytesBefore := testutil.ToFloat64(nonprechargeBytes) + prechargeBefore := testutil.ToFloat64(prechargeCounter) + actualBytesBefore := testutil.ToFloat64(actualBytes) + + // Hint=0 read: implements predictedReadBytesProvider but returns 0 -> + // Phase 1 skips pre-charge, Phase 2 must record nonprecharge bytes. + const readBytesAmount = uint64(256 * 1024) + coldReq := &TestRequestInfo{isWrite: false} + coldResp := &TestResponseInfo{readBytes: readBytesAmount, succeed: true} + + _, err := gc.onResponseImpl(coldReq, coldResp) + re.NoError(err) + re.InDelta(countBefore+1, testutil.ToFloat64(nonprechargeCounter), 1e-6, + "hint=0 read should increment nonprecharge counter") + re.InDelta(bytesBefore+float64(readBytesAmount), + testutil.ToFloat64(nonprechargeBytes), 1e-6, + "nonprecharge bytes must equal Phase 2 actual read bytes") + re.InDelta(prechargeBefore, testutil.ToFloat64(prechargeCounter), 1e-6, + "precharge counter must stay unchanged when hint=0") + re.InDelta(actualBytesBefore, testutil.ToFloat64(actualBytes), 1e-6, + "warm-path actual-bytes counter must stay unchanged when hint=0") + + // Sanity: a hint>0 read should hit the warm path, not the cold path. + warmReq := &TestRequestInfo{isWrite: false, predictedReadBytes: 1 * 1024 * 1024} + warmResp := &TestResponseInfo{readBytes: readBytesAmount, succeed: true} + _, _, _, _, err = gc.onRequestWaitImpl(context.TODO(), warmReq) + re.NoError(err) + _, err = gc.onResponseImpl(warmReq, warmResp) + re.NoError(err) + re.InDelta(countBefore+1, testutil.ToFloat64(nonprechargeCounter), 1e-6, + "warm request must not touch nonprecharge counter") + re.InDelta(prechargeBefore+1, testutil.ToFloat64(prechargeCounter), 1e-6, + "warm request should increment precharge counter") + + // Writes skip both buckets entirely. + writeReq := &TestRequestInfo{isWrite: true, writeBytes: 1024} + writeResp := &TestResponseInfo{readBytes: 0, succeed: true} + _, err = gc.onResponseImpl(writeReq, writeResp) + re.NoError(err) + re.InDelta(countBefore+1, testutil.ToFloat64(nonprechargeCounter), 1e-6, + "write must not touch nonprecharge counter") +} + func TestPagingPreChargeZeroDelta(t *testing.T) { re := require.New(t) gc := createTestGroupCostController(re) diff --git a/client/resource_group/controller/metrics/metrics.go b/client/resource_group/controller/metrics/metrics.go index 4e9e942fe8a..393bebd6be4 100644 --- a/client/resource_group/controller/metrics/metrics.go +++ b/client/resource_group/controller/metrics/metrics.go @@ -67,6 +67,17 @@ var ( // PagingPredictionResidualBytes observes (actual - predicted) bytes for // pre-charged requests. Shows EMA prediction accuracy. PagingPredictionResidualBytes *prometheus.HistogramVec + + // PagingNonprechargeCounter counts RPCs that implemented the predicted + // read-bytes interface but reported 0 (e.g. EMA cold-start) and therefore + // ran without Phase 1 pre-charge. Paired with PagingPrechargeCounter this + // yields the cold/ready RPC split from the PD side. + PagingNonprechargeCounter *prometheus.CounterVec + // PagingNonprechargeActualBytes sums the actual read bytes of RPCs that + // skipped pre-charge. Quantifies how much read volume bypassed Phase 1 + // throttling — the "cold window" signal for token-bucket pressure + // analysis. + PagingNonprechargeActualBytes *prometheus.CounterVec ) func init() { @@ -209,6 +220,24 @@ func initMetrics(constLabels prometheus.Labels) { Help: "Histogram of (actual_read_bytes - predicted_read_bytes) for pre-charged requests. Shows EMA prediction accuracy.", ConstLabels: constLabels, }, []string{newResourceGroupNameLabel}) + + PagingNonprechargeCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: requestSubsystem, + Name: "paging_nonprecharge_total", + Help: "Counter of RC paging RPCs that implemented the predicted hint but reported 0 (e.g. EMA cold-start) and ran without Phase 1 pre-charge.", + ConstLabels: constLabels, + }, []string{newResourceGroupNameLabel}) + + PagingNonprechargeActualBytes = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: requestSubsystem, + Name: "paging_nonprecharge_actual_bytes_total", + Help: "Sum of actual bytes read by paging RPCs that skipped pre-charge (hint=0). Quantifies cold-window read volume bypassing Phase 1 throttling.", + ConstLabels: constLabels, + }, []string{newResourceGroupNameLabel}) } // InitAndRegisterMetrics initializes and register metrics. @@ -228,4 +257,6 @@ func InitAndRegisterMetrics(constLabels prometheus.Labels) { prometheus.MustRegister(PagingPrechargeBytesCounter) prometheus.MustRegister(PagingActualBytesCounter) prometheus.MustRegister(PagingPredictionResidualBytes) + prometheus.MustRegister(PagingNonprechargeCounter) + prometheus.MustRegister(PagingNonprechargeActualBytes) } From d6521789556ad38267ba0a62cab31bf0487d6819 Mon Sep 17 00:00:00 2001 From: Yuhao Zhang Date: Tue, 21 Apr 2026 15:19:51 +0800 Subject: [PATCH 10/10] client/resource_group: drop ad-hoc Phase 1/Phase 2 wording from paging precharge Replace "Phase 1" / "Phase 2" in comments and test names with the existing API terms (BeforeKVRequest pre-charge / AfterKVRequest settle). The Phase 1/2 framing was introduced only in this branch's own commits and is not part of any preexisting controller convention; staying with the BeforeKVRequest/AfterKVRequest vocabulary keeps the prose readable to anyone already familiar with ResourceCalculator. No behavior change; only comments, test variable names, and metric Help strings. Signed-off-by: Yuhao Zhang --- .../controller/group_controller.go | 7 ++- .../controller/group_controller_test.go | 63 ++++++++++--------- .../controller/metrics/metrics.go | 10 +-- client/resource_group/controller/model.go | 12 ++-- 4 files changed, 47 insertions(+), 45 deletions(-) diff --git a/client/resource_group/controller/group_controller.go b/client/resource_group/controller/group_controller.go index de5fcc35e81..d0bc2601171 100644 --- a/client/resource_group/controller/group_controller.go +++ b/client/resource_group/controller/group_controller.go @@ -117,7 +117,8 @@ type groupMetricsCollection struct { // Nonprecharge bucket: RPCs that implemented the predicted-bytes // interface but reported 0 (EMA cold-start or feature-disabled) and - // therefore skipped Phase 1 pre-charge entirely. Recorded at Phase 2. + // therefore skipped pre-charge entirely. Recorded at settle time + // (AfterKVRequest). nonprechargeCounter prometheus.Counter nonprechargeActualBytes prometheus.Counter } @@ -164,8 +165,8 @@ func (gmc *groupMetricsCollection) observePagingActual(predicted, actual uint64) // observePagingNonprecharge records one RPC that implemented the predicted // read-bytes interface but reported 0 (EMA cold or feature disabled) and -// therefore ran without Phase 1 pre-charge. `actual` is the response's read -// bytes, settled at Phase 2. Paired with observePagingPrecharge this gives +// therefore ran without pre-charge. `actual` is the response's read bytes, +// settled in AfterKVRequest. Paired with observePagingPrecharge this gives // the cold/ready split and the byte volume that bypassed throttling. func (gmc *groupMetricsCollection) observePagingNonprecharge(actual uint64) { gmc.nonprechargeCounter.Inc() diff --git a/client/resource_group/controller/group_controller_test.go b/client/resource_group/controller/group_controller_test.go index 52b4a5fc224..a1a3622a1d3 100644 --- a/client/resource_group/controller/group_controller_test.go +++ b/client/resource_group/controller/group_controller_test.go @@ -215,9 +215,9 @@ func TestPredictedReadBytesPreCharge(t *testing.T) { cfg := DefaultRUConfig() kvCalc := newKVCalculator(cfg) - // Phase 1: BeforeKVRequest with a PredictedReadBytes hint should - // pre-charge baseCost + predictedReadBytes * ReadBytesCost, regardless - // of whether PagingSizeBytes is set. The two are decoupled. + // BeforeKVRequest with a PredictedReadBytes hint should pre-charge + // baseCost + predictedReadBytes * ReadBytesCost, regardless of + // whether PagingSizeBytes is set. The two are decoupled. pagingSizeBytes := uint64(4 * 1024 * 1024) // protocol-level cap only predictedReadBytes := uint64(256 * 1024) // learned EMA estimate req := &TestRequestInfo{ @@ -225,33 +225,33 @@ func TestPredictedReadBytesPreCharge(t *testing.T) { pagingSizeBytes: pagingSizeBytes, predictedReadBytes: predictedReadBytes, } - phase1 := &rmpb.Consumption{} - kvCalc.BeforeKVRequest(phase1, req) + precharge := &rmpb.Consumption{} + kvCalc.BeforeKVRequest(precharge, req) baseCost := float64(cfg.ReadBaseCost) + float64(cfg.ReadPerBatchBaseCost)*defaultAvgBatchProportion hintCost := float64(cfg.ReadBytesCost) * float64(predictedReadBytes) - re.InDelta(baseCost+hintCost, phase1.RRU, 1e-6, - "Phase 1 should pre-charge based on PredictedReadBytes") + re.InDelta(baseCost+hintCost, precharge.RRU, 1e-6, + "BeforeKVRequest should pre-charge based on PredictedReadBytes") - // Phase 2 should subtract the same hint basis, preserving - // Phase 1 + Phase 2 == baseCost + actualCost. + // AfterKVRequest should subtract the same hint basis, preserving + // precharge + settle == baseCost + actualCost. actualReadBytes := uint64(300 * 1024) // close to the prediction resp := &TestResponseInfo{ readBytes: actualReadBytes, kvCPU: 10 * time.Millisecond, succeed: true, } - phase2 := &rmpb.Consumption{} - kvCalc.AfterKVRequest(phase2, req, resp) + settle := &rmpb.Consumption{} + kvCalc.AfterKVRequest(settle, req, resp) actualReadCost := float64(cfg.ReadBytesCost) * float64(actualReadBytes) cpuCost := float64(cfg.CPUMsCost) * 10.0 - re.InDelta(actualReadCost+cpuCost-hintCost, phase2.RRU, 1e-6, - "Phase 2 should settle using the same hint basis as Phase 1") + re.InDelta(actualReadCost+cpuCost-hintCost, settle.RRU, 1e-6, + "AfterKVRequest should settle using the same hint basis as BeforeKVRequest") - totalRRU := phase1.RRU + phase2.RRU + totalRRU := precharge.RRU + settle.RRU re.InDelta(baseCost+actualReadCost+cpuCost, totalRRU, 1e-6, - "Total RRU across Phase 1+2 should equal baseCost + actualCost") + "Total RRU across precharge+settle should equal baseCost + actualCost") } func TestNoPreChargeWithoutPredictedReadBytes(t *testing.T) { @@ -260,17 +260,17 @@ func TestNoPreChargeWithoutPredictedReadBytes(t *testing.T) { kvCalc := newKVCalculator(cfg) baseCost := float64(cfg.ReadBaseCost) + float64(cfg.ReadPerBatchBaseCost)*defaultAvgBatchProportion - // With PagingSizeBytes but no PredictedReadBytes hint, Phase 1 must - // NOT pre-charge: the two quantities are decoupled. Phase 2 bills the - // actual read bytes only. + // With PagingSizeBytes but no PredictedReadBytes hint, BeforeKVRequest + // must NOT pre-charge: the two quantities are decoupled. AfterKVRequest + // bills the actual read bytes only. pagingSizeBytes := uint64(4 * 1024 * 1024) reqPagingOnly := &TestRequestInfo{ isWrite: false, pagingSizeBytes: pagingSizeBytes, } - phase1 := &rmpb.Consumption{} - kvCalc.BeforeKVRequest(phase1, reqPagingOnly) - re.InDelta(baseCost, phase1.RRU, 1e-6, + precharge := &rmpb.Consumption{} + kvCalc.BeforeKVRequest(precharge, reqPagingOnly) + re.InDelta(baseCost, precharge.RRU, 1e-6, "PagingSizeBytes alone must not trigger pre-charge") actualReadBytes := uint64(2 * 1024 * 1024) @@ -279,20 +279,20 @@ func TestNoPreChargeWithoutPredictedReadBytes(t *testing.T) { kvCPU: 10 * time.Millisecond, succeed: true, } - phase2 := &rmpb.Consumption{} - kvCalc.AfterKVRequest(phase2, reqPagingOnly, resp) + settle := &rmpb.Consumption{} + kvCalc.AfterKVRequest(settle, reqPagingOnly, resp) actualReadCost := float64(cfg.ReadBytesCost) * float64(actualReadBytes) cpuCost := float64(cfg.CPUMsCost) * 10.0 - re.InDelta(actualReadCost+cpuCost, phase2.RRU, 1e-6, - "Phase 2 should bill actual read cost only when nothing was pre-charged") + re.InDelta(actualReadCost+cpuCost, settle.RRU, 1e-6, + "AfterKVRequest should bill actual read cost only when nothing was pre-charged") // Bare request without any hint or paging also pre-charges nothing. reqNone := &TestRequestInfo{isWrite: false} - phase1None := &rmpb.Consumption{} - kvCalc.BeforeKVRequest(phase1None, reqNone) - re.InDelta(baseCost, phase1None.RRU, 1e-6, - "Without hint or paging, Phase 1 should only charge baseCost") + prechargeNone := &rmpb.Consumption{} + kvCalc.BeforeKVRequest(prechargeNone, reqNone) + re.InDelta(baseCost, prechargeNone.RRU, 1e-6, + "Without hint or paging, BeforeKVRequest should only charge baseCost") } func TestPagingPreChargeTokenRefund(t *testing.T) { @@ -439,7 +439,8 @@ func TestNonprechargeMetricsRecordedWhenHintZero(t *testing.T) { actualBytesBefore := testutil.ToFloat64(actualBytes) // Hint=0 read: implements predictedReadBytesProvider but returns 0 -> - // Phase 1 skips pre-charge, Phase 2 must record nonprecharge bytes. + // BeforeKVRequest skips pre-charge, AfterKVRequest must record + // nonprecharge bytes. const readBytesAmount = uint64(256 * 1024) coldReq := &TestRequestInfo{isWrite: false} coldResp := &TestResponseInfo{readBytes: readBytesAmount, succeed: true} @@ -450,7 +451,7 @@ func TestNonprechargeMetricsRecordedWhenHintZero(t *testing.T) { "hint=0 read should increment nonprecharge counter") re.InDelta(bytesBefore+float64(readBytesAmount), testutil.ToFloat64(nonprechargeBytes), 1e-6, - "nonprecharge bytes must equal Phase 2 actual read bytes") + "nonprecharge bytes must equal AfterKVRequest actual read bytes") re.InDelta(prechargeBefore, testutil.ToFloat64(prechargeCounter), 1e-6, "precharge counter must stay unchanged when hint=0") re.InDelta(actualBytesBefore, testutil.ToFloat64(actualBytes), 1e-6, diff --git a/client/resource_group/controller/metrics/metrics.go b/client/resource_group/controller/metrics/metrics.go index 393bebd6be4..5bff7281f9e 100644 --- a/client/resource_group/controller/metrics/metrics.go +++ b/client/resource_group/controller/metrics/metrics.go @@ -70,11 +70,11 @@ var ( // PagingNonprechargeCounter counts RPCs that implemented the predicted // read-bytes interface but reported 0 (e.g. EMA cold-start) and therefore - // ran without Phase 1 pre-charge. Paired with PagingPrechargeCounter this - // yields the cold/ready RPC split from the PD side. + // ran without pre-charge. Paired with PagingPrechargeCounter this yields + // the cold/ready RPC split from the PD side. PagingNonprechargeCounter *prometheus.CounterVec // PagingNonprechargeActualBytes sums the actual read bytes of RPCs that - // skipped pre-charge. Quantifies how much read volume bypassed Phase 1 + // skipped pre-charge. Quantifies how much read volume bypassed pre-charge // throttling — the "cold window" signal for token-bucket pressure // analysis. PagingNonprechargeActualBytes *prometheus.CounterVec @@ -226,7 +226,7 @@ func initMetrics(constLabels prometheus.Labels) { Namespace: namespace, Subsystem: requestSubsystem, Name: "paging_nonprecharge_total", - Help: "Counter of RC paging RPCs that implemented the predicted hint but reported 0 (e.g. EMA cold-start) and ran without Phase 1 pre-charge.", + Help: "Counter of RC paging RPCs that implemented the predicted hint but reported 0 (e.g. EMA cold-start) and ran without pre-charge.", ConstLabels: constLabels, }, []string{newResourceGroupNameLabel}) @@ -235,7 +235,7 @@ func initMetrics(constLabels prometheus.Labels) { Namespace: namespace, Subsystem: requestSubsystem, Name: "paging_nonprecharge_actual_bytes_total", - Help: "Sum of actual bytes read by paging RPCs that skipped pre-charge (hint=0). Quantifies cold-window read volume bypassing Phase 1 throttling.", + Help: "Sum of actual bytes read by paging RPCs that skipped pre-charge (hint=0). Quantifies cold-window read volume bypassing pre-charge throttling.", ConstLabels: constLabels, }, []string{newResourceGroupNameLabel}) } diff --git a/client/resource_group/controller/model.go b/client/resource_group/controller/model.go index 0d37576809a..f540f1a6b87 100644 --- a/client/resource_group/controller/model.go +++ b/client/resource_group/controller/model.go @@ -62,7 +62,7 @@ type RequestInfo interface { // per-logical-scan EMA maintained in TiDB) of how many bytes the request // will read. When present and > 0, it is used as the byte basis for RC // paging pre-charge; when absent or zero the request is not pre-charged -// and will be billed at Phase 2 by actual read bytes only. +// and will be billed in AfterKVRequest by actual read bytes only. // // Defined as an optional interface (not a method on RequestInfo) so older // RequestInfo implementations that have not been updated still compile; @@ -131,10 +131,10 @@ func (kc *KVCalculator) BeforeKVRequest(consumption *rmpb.Consumption, req Reque consumption.KvReadRpcCount += 1 consumption.RRU += float64(kc.ReadBaseCost) + float64(kc.ReadPerBatchBaseCost)*defaultAvgBatchProportion // RC Paging pre-charge: pre-charge the learned read-bytes RU so that - // concurrent workers are throttled at Phase 1 instead of all hitting - // Phase 2 at once. Only applies when the caller supplies a - // PredictedReadBytes hint; without it the request is billed at - // Phase 2 by actual read bytes only. + // concurrent workers are throttled at BeforeKVRequest instead of all + // hitting AfterKVRequest settlement at once. Only applies when the + // caller supplies a PredictedReadBytes hint; without it the request + // is billed in AfterKVRequest by actual read bytes only. if bytesForEst := estimatedReadBytes(req); bytesForEst > 0 { consumption.RRU += float64(kc.ReadBytesCost) * float64(bytesForEst) } @@ -173,7 +173,7 @@ func (kc *KVCalculator) AfterKVRequest(consumption *rmpb.Consumption, req Reques // For now, we can only collect the KV CPU cost for a read request. kc.calculateCPUCost(consumption, res) // RC Paging settlement: subtract the pre-charged bytes RU added in - // BeforeKVRequest so the net total (Phase 1 + Phase 2) equals + // BeforeKVRequest so the net total (pre-charge + settle) equals // baseCost + actualCost. Symmetric with BeforeKVRequest: when no // hint was supplied nothing was pre-charged, so nothing is refunded. if bytesForEst := estimatedReadBytes(req); bytesForEst > 0 {