Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,7 @@ be 1 or higher.`,
FrontendGlobalNamespaceRPS = NewNamespaceIntSetting(
"frontend.globalNamespaceRPS",
0,
`FrontendGlobalNamespaceRPS is workflow namespace rate limit per second for the whole cluster.
`FrontendGlobalNamespaceRPS is namespace rate limit per second for the whole cluster.
The limit is evenly distributed among available frontend service instances.
If this is set, it overwrites per instance limit "frontend.namespaceRPS".`,
)
Expand Down Expand Up @@ -1546,6 +1546,12 @@ execution is deleted. When enabled, workflow deletions on the active cluster wil
3000,
`HistoryRPS is request rate per second for each history host`,
)
HistoryGlobalNamespaceRPS = NewNamespaceIntSetting(
"history.globalNamespaceRPS",
0,
`HistoryGlobalNamespaceRPS is namespace rate limit per second for all history hosts.
The limit is distributed among available history service hosts based on their owned shard count`,
)
HistoryPersistenceMaxQPS = NewGlobalIntSetting(
"history.persistenceMaxQPS",
9000,
Expand Down
6 changes: 6 additions & 0 deletions common/quotas/priority.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package quotas

const (
// OperatorPriority is used to give precedence to calls coming from web UI or tctl
OperatorPriority = 0
)
27 changes: 27 additions & 0 deletions common/quotas/priority_rate_limiter_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,33 @@ type (

var _ RequestRateLimiter = (*PriorityRateLimiterImpl)(nil)

func NewPriorityRateLimiterHelper(
rateBurstFn RateBurst,
operatorRPSRatio func() float64,
requestPriorityFn RequestPriorityFn,
prioritiesOrdered []int,
) RequestRateLimiter {
rateLimiters := make(map[int]RequestRateLimiter)
for _, priority := range prioritiesOrdered {
if priority == OperatorPriority {
rateLimiters[priority] = NewRequestRateLimiterAdapter(
NewDynamicRateLimiter(
NewOperatorRateBurst(rateBurstFn, operatorRPSRatio),
defaultRefreshInterval,
),
)
} else {
rateLimiters[priority] = NewRequestRateLimiterAdapter(
NewDynamicRateLimiter(
rateBurstFn,
defaultRefreshInterval,
),
)
}
}
return NewPriorityRateLimiter(requestPriorityFn, rateLimiters)
}

// NewPriorityRateLimiter returns a new rate limiter that can handle dynamic
// configuration updates
func NewPriorityRateLimiter(
Expand Down
78 changes: 77 additions & 1 deletion common/quotas/dynamic.go → common/quotas/rate_burst.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,28 @@ import (
"sync/atomic"
)

var (
_ RateBurst = (*RateBurstImpl)(nil)
_ RateBurst = (*MutableRateBurstImpl)(nil)
_ RateBurst = (*NamespaceRateBurstImpl)(nil)
_ RateBurst = (*OperatorRateBurstImpl)(nil)
)

var (
DefaultIncomingBurstRatioFn = func() float64 {
return defaultIncomingRateBurstRatio
}
DefaultOutgoingBurstRatioFn = func() float64 {
return defaultOutgoingRateBurstRatio
}
DefaultIncomingNamespaceBurstRatioFn = func(_ string) float64 {
return defaultIncomingRateBurstRatio
}
DefaultOutgoingNamespaceBurstRatioFn = func(_ string) float64 {
return defaultOutgoingRateBurstRatio
}
)

type (
// RateFn returns a float64 as the RPS
RateFn func() float64
Expand All @@ -19,7 +41,10 @@ type (
BurstRatioFn func() float64

// NamespaceBurstFn returns an int as the burst / bucket size for the given namespace
NamespaceBurstFn func(namespace string) float64
NamespaceBurstFn func(namespace string) int

// NamespaceBurstRatioFn returns a float as the ratio of burst to rate for the given namespace
NamespaceBurstRatioFn func(namespace string) float64

// RateBurst returns rate & burst for rate limiter
RateBurst interface {
Expand All @@ -43,6 +68,17 @@ type (
SetBurst(burst int)
RateBurst
}

NamespaceRateBurstImpl struct {
namespaceName string
rateFn NamespaceRateFn
burstFn NamespaceBurstFn
}

OperatorRateBurstImpl struct {
operatorRateRatio func() float64
baseRateBurstFn RateBurst
}
)

func NewRateBurst(
Expand Down Expand Up @@ -128,3 +164,43 @@ func (d *MutableRateBurstImpl) Rate() float64 {
func (d *MutableRateBurstImpl) Burst() int {
return int(d.burst.Load())
}

func NewNamespaceRateBurst(
namespaceName string,
rateFn NamespaceRateFn,
burstRatioFn NamespaceBurstRatioFn,
) *NamespaceRateBurstImpl {
return &NamespaceRateBurstImpl{
namespaceName: namespaceName,
rateFn: rateFn,
burstFn: func(namespace string) int {
return max(1, int(math.Ceil(rateFn(namespace)*burstRatioFn(namespace))))
},
}
}

func (n *NamespaceRateBurstImpl) Rate() float64 {
return n.rateFn(n.namespaceName)
}

func (n *NamespaceRateBurstImpl) Burst() int {
return n.burstFn(n.namespaceName)
}

func NewOperatorRateBurst(
baseRateBurstFn RateBurst,
operatorRateRatio func() float64,
) *OperatorRateBurstImpl {
return &OperatorRateBurstImpl{
operatorRateRatio: operatorRateRatio,
baseRateBurstFn: baseRateBurstFn,
}
}

func (c *OperatorRateBurstImpl) Rate() float64 {
return c.operatorRateRatio() * c.baseRateBurstFn.Rate()
}

func (c *OperatorRateBurstImpl) Burst() int {
return c.baseRateBurstFn.Burst()
}
155 changes: 42 additions & 113 deletions service/frontend/configs/quotas.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package configs

import (
"math"
"time"

"go.temporal.io/server/common/dynamicconfig"
Expand All @@ -11,11 +10,6 @@ import (
"go.temporal.io/server/common/quotas/calculator"
)

const (
// OperatorPriority is used to give precedence to calls coming from web UI or tctl
OperatorPriority = 0
)

const (
// These names do not map to an underlying gRPC service. This format is used for consistency with the
// gRPC API names on which the authorizer - the consumer of this string - may depend.
Expand Down Expand Up @@ -255,62 +249,6 @@ var (
}
)

type (
NamespaceRateBurstImpl struct {
namespaceName string
rateFn quotas.NamespaceRateFn
burstFn dynamicconfig.IntPropertyFnWithNamespaceFilter
}

operatorRateBurstImpl struct {
operatorRateRatio dynamicconfig.FloatPropertyFn
baseRateBurstFn quotas.RateBurst
}
)

var _ quotas.RateBurst = (*NamespaceRateBurstImpl)(nil)
var _ quotas.RateBurst = (*operatorRateBurstImpl)(nil)

func NewNamespaceRateBurst(
namespaceName string,
rateFn quotas.NamespaceRateFn,
burstRatioFn dynamicconfig.FloatPropertyFnWithNamespaceFilter,
) *NamespaceRateBurstImpl {
return &NamespaceRateBurstImpl{
namespaceName: namespaceName,
rateFn: rateFn,
burstFn: func(namespace string) int {
return max(1, int(math.Ceil(rateFn(namespace)*burstRatioFn(namespace))))
},
}
}

func (c *NamespaceRateBurstImpl) Rate() float64 {
return c.rateFn(c.namespaceName)
}

func (c *NamespaceRateBurstImpl) Burst() int {
return c.burstFn(c.namespaceName)
}

func newOperatorRateBurst(
baseRateBurstFn quotas.RateBurst,
operatorRateRatio dynamicconfig.FloatPropertyFn,
) *operatorRateBurstImpl {
return &operatorRateBurstImpl{
operatorRateRatio: operatorRateRatio,
baseRateBurstFn: baseRateBurstFn,
}
}

func (c *operatorRateBurstImpl) Rate() float64 {
return c.operatorRateRatio() * c.baseRateBurstFn.Rate()
}

func (c *operatorRateBurstImpl) Burst() int {
return c.baseRateBurstFn.Burst()
}

func NewRequestToRateLimiter(
executionRateBurstFn quotas.RateBurst,
visibilityRateBurstFn quotas.RateBurst,
Expand Down Expand Up @@ -340,69 +278,60 @@ func NewExecutionPriorityRateLimiter(
rateBurstFn quotas.RateBurst,
operatorRPSRatio dynamicconfig.FloatPropertyFn,
) quotas.RequestRateLimiter {
rateLimiters := make(map[int]quotas.RequestRateLimiter)
for priority := range ExecutionAPIPrioritiesOrdered {
if priority == OperatorPriority {
rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDynamicRateLimiter(newOperatorRateBurst(rateBurstFn, operatorRPSRatio), time.Minute))
} else {
rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDynamicRateLimiter(rateBurstFn, time.Minute))
}
}
return quotas.NewPriorityRateLimiter(func(req quotas.Request) int {
if req.CallerType == headers.CallerTypeOperator {
return OperatorPriority
}
if priority, ok := APIToPriority[req.API]; ok {
return priority
}
return ExecutionAPIPrioritiesOrdered[len(ExecutionAPIPrioritiesOrdered)-1]
}, rateLimiters)
return quotas.NewPriorityRateLimiterHelper(
rateBurstFn,
operatorRPSRatio,
func(req quotas.Request) int {
if req.CallerType == headers.CallerTypeOperator {
return quotas.OperatorPriority
}
if priority, ok := APIToPriority[req.API]; ok {
return priority
}
return ExecutionAPIPrioritiesOrdered[len(ExecutionAPIPrioritiesOrdered)-1]
},
ExecutionAPIPrioritiesOrdered,
)
}

func NewVisibilityPriorityRateLimiter(
rateBurstFn quotas.RateBurst,
operatorRPSRatio dynamicconfig.FloatPropertyFn,
) quotas.RequestRateLimiter {
rateLimiters := make(map[int]quotas.RequestRateLimiter)
for priority := range VisibilityAPIPrioritiesOrdered {
if priority == OperatorPriority {
rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDynamicRateLimiter(newOperatorRateBurst(rateBurstFn, operatorRPSRatio), time.Minute))
} else {
rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDynamicRateLimiter(rateBurstFn, time.Minute))
}
}
return quotas.NewPriorityRateLimiter(func(req quotas.Request) int {
if req.CallerType == headers.CallerTypeOperator {
return OperatorPriority
}
if priority, ok := VisibilityAPIToPriority[req.API]; ok {
return priority
}
return VisibilityAPIPrioritiesOrdered[len(VisibilityAPIPrioritiesOrdered)-1]
}, rateLimiters)
return quotas.NewPriorityRateLimiterHelper(
rateBurstFn,
operatorRPSRatio,
func(req quotas.Request) int {
if req.CallerType == headers.CallerTypeOperator {
return quotas.OperatorPriority
}
if priority, ok := VisibilityAPIToPriority[req.API]; ok {
return priority
}
return VisibilityAPIPrioritiesOrdered[len(VisibilityAPIPrioritiesOrdered)-1]
},
VisibilityAPIPrioritiesOrdered,
)
}

func NewNamespaceReplicationInducingAPIPriorityRateLimiter(
rateBurstFn quotas.RateBurst,
operatorRPSRatio dynamicconfig.FloatPropertyFn,
) quotas.RequestRateLimiter {
rateLimiters := make(map[int]quotas.RequestRateLimiter)
for priority := range NamespaceReplicationInducingAPIPrioritiesOrdered {
if priority == OperatorPriority {
rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDynamicRateLimiter(newOperatorRateBurst(rateBurstFn, operatorRPSRatio), time.Minute))
} else {
rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDynamicRateLimiter(rateBurstFn, time.Minute))
}
}
return quotas.NewPriorityRateLimiter(func(req quotas.Request) int {
if req.CallerType == headers.CallerTypeOperator {
return OperatorPriority
}
if priority, ok := NamespaceReplicationInducingAPIToPriority[req.API]; ok {
return priority
}
return NamespaceReplicationInducingAPIPrioritiesOrdered[len(NamespaceReplicationInducingAPIPrioritiesOrdered)-1]
}, rateLimiters)
return quotas.NewPriorityRateLimiterHelper(
rateBurstFn,
operatorRPSRatio,
func(req quotas.Request) int {
if req.CallerType == headers.CallerTypeOperator {
return quotas.OperatorPriority
}
if priority, ok := NamespaceReplicationInducingAPIToPriority[req.API]; ok {
return priority
}
return NamespaceReplicationInducingAPIPrioritiesOrdered[len(NamespaceReplicationInducingAPIPrioritiesOrdered)-1]
},
NamespaceReplicationInducingAPIPrioritiesOrdered,
)
}

// NewGlobalNamespaceRateLimiter creates a namespace-aware rate limiter that uses
Expand Down
Loading
Loading