Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ require (
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3x
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
Expand Down
94 changes: 83 additions & 11 deletions client/resource_group/controller/group_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,21 @@ type groupMetricsCollection struct {
tokenRequestCounter prometheus.Counter
runningKVRequestCounter prometheus.Gauge
consumeTokenHistogram prometheus.Observer

// Paging pre-charge observability: cached per-RG so the hot path avoids
// WithLabelValues on every KV request. Only pre-charged requests (those
// with a PredictedReadBytes hint > 0) contribute to these metrics.
prechargeCounter prometheus.Counter
prechargeBytesCounter prometheus.Counter
actualBytesCounter prometheus.Counter
predictionResidualBytes prometheus.Observer

// Nonprecharge bucket: RPCs that implemented the predicted-bytes
// interface but reported 0 (EMA cold-start or feature-disabled) and
// therefore skipped pre-charge entirely. Recorded at settle time
// (AfterKVRequest).
nonprechargeCounter prometheus.Counter
nonprechargeActualBytes prometheus.Counter
}

func initMetrics(oldName, name string) *groupMetricsCollection {
Expand All @@ -122,9 +137,42 @@ func initMetrics(oldName, name string) *groupMetricsCollection {
tokenRequestCounter: metrics.ResourceGroupTokenRequestCounter.WithLabelValues(oldName, name),
runningKVRequestCounter: metrics.GroupRunningKVRequestCounter.WithLabelValues(name),
consumeTokenHistogram: metrics.TokenConsumedHistogram.WithLabelValues(name),

prechargeCounter: metrics.PagingPrechargeCounter.WithLabelValues(name),
prechargeBytesCounter: metrics.PagingPrechargeBytesCounter.WithLabelValues(name),
actualBytesCounter: metrics.PagingActualBytesCounter.WithLabelValues(name),
predictionResidualBytes: metrics.PagingPredictionResidualBytes.WithLabelValues(name),

nonprechargeCounter: metrics.PagingNonprechargeCounter.WithLabelValues(name),
nonprechargeActualBytes: metrics.PagingNonprechargeActualBytes.WithLabelValues(name),
}
}

// observePagingPrecharge records one pre-charge event. Caller must guarantee
// bytesForEst > 0; cold-start requests with no hint are not pre-charged and
// should not be observed here.
func (gmc *groupMetricsCollection) observePagingPrecharge(bytesForEst uint64) {
gmc.prechargeCounter.Inc()
gmc.prechargeBytesCounter.Add(float64(bytesForEst))
}

// observePagingActual records actual read bytes and the signed residual for a
// pre-charged request. Caller must guarantee predicted > 0.
func (gmc *groupMetricsCollection) observePagingActual(predicted, actual uint64) {
gmc.actualBytesCounter.Add(float64(actual))
gmc.predictionResidualBytes.Observe(float64(actual) - float64(predicted))
}

// observePagingNonprecharge records one RPC that implemented the predicted
// read-bytes interface but reported 0 (EMA cold or feature disabled) and
// therefore ran without pre-charge. `actual` is the response's read bytes,
// settled in AfterKVRequest. Paired with observePagingPrecharge this gives
// the cold/ready split and the byte volume that bypassed throttling.
func (gmc *groupMetricsCollection) observePagingNonprecharge(actual uint64) {
gmc.nonprechargeCounter.Inc()
gmc.nonprechargeActualBytes.Add(float64(actual))
}

type tokenCounter struct {
fillRate uint64

Expand Down Expand Up @@ -551,6 +599,9 @@ func (gc *groupCostController) onRequestWaitImpl(
for _, calc := range gc.calculators {
calc.BeforeKVRequest(delta, info)
}
if bytesForEst := estimatedReadBytes(info); bytesForEst > 0 {
gc.metrics.observePagingPrecharge(bytesForEst)
}

gc.mu.Lock()
add(gc.mu.consumption, delta)
Expand Down Expand Up @@ -601,10 +652,19 @@ func (gc *groupCostController) onResponseImpl(
for _, calc := range gc.calculators {
calc.AfterKVRequest(delta, req, resp)
}
if bytesForEst := estimatedReadBytes(req); bytesForEst > 0 {
gc.metrics.observePagingActual(bytesForEst, resp.ReadBytes())
} else if !req.IsWrite() {
if _, ok := req.(predictedReadBytesProvider); ok {
gc.metrics.observePagingNonprecharge(resp.ReadBytes())
}
}
if !gc.burstable.Load() {
counter := gc.run.requestUnitTokens
if v := getRUValueFromConsumption(delta); v > 0 {
counter.limiter.RemoveTokens(time.Now(), v)
} else if v < 0 {
counter.limiter.RefundTokens(time.Now(), -v)
}
}

Expand Down Expand Up @@ -632,21 +692,33 @@ func (gc *groupCostController) onResponseWaitImpl(
for _, calc := range gc.calculators {
calc.AfterKVRequest(delta, req, resp)
}
if bytesForEst := estimatedReadBytes(req); bytesForEst > 0 {
gc.metrics.observePagingActual(bytesForEst, resp.ReadBytes())
} else if !req.IsWrite() {
if _, ok := req.(predictedReadBytesProvider); ok {
gc.metrics.observePagingNonprecharge(resp.ReadBytes())
}
}
var waitDuration time.Duration
if !gc.burstable.Load() {
allowDebt := delta.ReadBytes+delta.WriteBytes < bigRequestThreshold || !gc.isThrottled.Load()
d, err := gc.acquireTokens(ctx, delta, &waitDuration, allowDebt)
if err != nil {
if errs.ErrClientResourceGroupThrottled.Equal(err) {
gc.metrics.failedRequestCounterWithThrottled.Inc()
gc.metrics.failedLimitReserveDuration.Observe(d.Seconds())
} else {
gc.metrics.failedRequestCounterWithOthers.Inc()
v := getRUValueFromConsumption(delta)
if v > 0 {
allowDebt := delta.ReadBytes+delta.WriteBytes < bigRequestThreshold || !gc.isThrottled.Load()
d, err := gc.acquireTokens(ctx, delta, &waitDuration, allowDebt)
if err != nil {
if errs.ErrClientResourceGroupThrottled.Equal(err) {
gc.metrics.failedRequestCounterWithThrottled.Inc()
gc.metrics.failedLimitReserveDuration.Observe(d.Seconds())
} else {
gc.metrics.failedRequestCounterWithOthers.Inc()
}
return nil, waitDuration, err
}
return nil, waitDuration, err
gc.metrics.successfulRequestDuration.Observe(d.Seconds())
waitDuration += d
} else if v < 0 {
gc.run.requestUnitTokens.limiter.RefundTokens(time.Now(), -v)
}
gc.metrics.successfulRequestDuration.Observe(d.Seconds())
waitDuration += d
}

gc.mu.Lock()
Expand Down
Loading
Loading