From c754756d4b1f8a42990c901052c28e20c78e8285 Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Wed, 8 Apr 2026 19:11:21 -0700 Subject: [PATCH] Add per namespace rate limiter on history service --- common/dynamicconfig/constants.go | 8 +- common/quotas/priority.go | 6 + common/quotas/priority_rate_limiter_impl.go | 27 ++++ common/quotas/{dynamic.go => rate_burst.go} | 78 +++++++++- service/frontend/configs/quotas.go | 155 ++++++-------------- service/frontend/fx.go | 22 ++- service/fx.go | 41 +++--- service/history/configs/config.go | 2 + service/history/configs/quotas.go | 61 ++++---- service/history/fx.go | 45 +++++- service/matching/fx.go | 4 +- service/worker/fx.go | 4 +- 12 files changed, 280 insertions(+), 173 deletions(-) create mode 100644 common/quotas/priority.go rename common/quotas/{dynamic.go => rate_burst.go} (55%) diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index 9372873d33..bc3d3a8395 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -729,7 +729,7 @@ be 1 or higher.`, FrontendGlobalNamespaceRPS = NewNamespaceIntSetting( "frontend.globalNamespaceRPS", 0, - `FrontendGlobalNamespaceRPS is workflow namespace rate limit per second for the whole cluster. + `FrontendGlobalNamespaceRPS is namespace rate limit per second for the whole cluster. The limit is evenly distributed among available frontend service instances. If this is set, it overwrites per instance limit "frontend.namespaceRPS".`, ) @@ -1546,6 +1546,12 @@ execution is deleted. When enabled, workflow deletions on the active cluster wil 3000, `HistoryRPS is request rate per second for each history host`, ) + HistoryGlobalNamespaceRPS = NewNamespaceIntSetting( + "history.globalNamespaceRPS", + 0, + `HistoryGlobalNamespaceRPS is namespace rate limit per second for all history hosts. +The limit is distributed among available history service hosts based on their owned shard count`, + ) HistoryPersistenceMaxQPS = NewGlobalIntSetting( "history.persistenceMaxQPS", 9000, diff --git a/common/quotas/priority.go b/common/quotas/priority.go new file mode 100644 index 0000000000..ce2b03d179 --- /dev/null +++ b/common/quotas/priority.go @@ -0,0 +1,6 @@ +package quotas + +const ( + // OperatorPriority is used to give precedence to calls coming from web UI or tctl + OperatorPriority = 0 +) diff --git a/common/quotas/priority_rate_limiter_impl.go b/common/quotas/priority_rate_limiter_impl.go index 802a28a046..e3e48d00ac 100644 --- a/common/quotas/priority_rate_limiter_impl.go +++ b/common/quotas/priority_rate_limiter_impl.go @@ -22,6 +22,33 @@ type ( var _ RequestRateLimiter = (*PriorityRateLimiterImpl)(nil) +func NewPriorityRateLimiterHelper( + rateBurstFn RateBurst, + operatorRPSRatio func() float64, + requestPriorityFn RequestPriorityFn, + prioritiesOrdered []int, +) RequestRateLimiter { + rateLimiters := make(map[int]RequestRateLimiter) + for _, priority := range prioritiesOrdered { + if priority == OperatorPriority { + rateLimiters[priority] = NewRequestRateLimiterAdapter( + NewDynamicRateLimiter( + NewOperatorRateBurst(rateBurstFn, operatorRPSRatio), + defaultRefreshInterval, + ), + ) + } else { + rateLimiters[priority] = NewRequestRateLimiterAdapter( + NewDynamicRateLimiter( + rateBurstFn, + defaultRefreshInterval, + ), + ) + } + } + return NewPriorityRateLimiter(requestPriorityFn, rateLimiters) +} + // NewPriorityRateLimiter returns a new rate limiter that can handle dynamic // configuration updates func NewPriorityRateLimiter( diff --git a/common/quotas/dynamic.go b/common/quotas/rate_burst.go similarity index 55% rename from common/quotas/dynamic.go rename to common/quotas/rate_burst.go index e27ef3ff21..ccd8982e8a 100644 --- a/common/quotas/dynamic.go +++ b/common/quotas/rate_burst.go @@ -5,6 +5,28 @@ import ( "sync/atomic" ) +var ( + _ RateBurst = (*RateBurstImpl)(nil) + _ RateBurst = (*MutableRateBurstImpl)(nil) + _ RateBurst = (*NamespaceRateBurstImpl)(nil) + _ RateBurst = (*OperatorRateBurstImpl)(nil) +) + +var ( + DefaultIncomingBurstRatioFn = func() float64 { + return defaultIncomingRateBurstRatio + } + DefaultOutgoingBurstRatioFn = func() float64 { + return defaultOutgoingRateBurstRatio + } + DefaultIncomingNamespaceBurstRatioFn = func(_ string) float64 { + return defaultIncomingRateBurstRatio + } + DefaultOutgoingNamespaceBurstRatioFn = func(_ string) float64 { + return defaultOutgoingRateBurstRatio + } +) + type ( // RateFn returns a float64 as the RPS RateFn func() float64 @@ -19,7 +41,10 @@ type ( BurstRatioFn func() float64 // NamespaceBurstFn returns an int as the burst / bucket size for the given namespace - NamespaceBurstFn func(namespace string) float64 + NamespaceBurstFn func(namespace string) int + + // NamespaceBurstRatioFn returns a float as the ratio of burst to rate for the given namespace + NamespaceBurstRatioFn func(namespace string) float64 // RateBurst returns rate & burst for rate limiter RateBurst interface { @@ -43,6 +68,17 @@ type ( SetBurst(burst int) RateBurst } + + NamespaceRateBurstImpl struct { + namespaceName string + rateFn NamespaceRateFn + burstFn NamespaceBurstFn + } + + OperatorRateBurstImpl struct { + operatorRateRatio func() float64 + baseRateBurstFn RateBurst + } ) func NewRateBurst( @@ -128,3 +164,43 @@ func (d *MutableRateBurstImpl) Rate() float64 { func (d *MutableRateBurstImpl) Burst() int { return int(d.burst.Load()) } + +func NewNamespaceRateBurst( + namespaceName string, + rateFn NamespaceRateFn, + burstRatioFn NamespaceBurstRatioFn, +) *NamespaceRateBurstImpl { + return &NamespaceRateBurstImpl{ + namespaceName: namespaceName, + rateFn: rateFn, + burstFn: func(namespace string) int { + return max(1, int(math.Ceil(rateFn(namespace)*burstRatioFn(namespace)))) + }, + } +} + +func (n *NamespaceRateBurstImpl) Rate() float64 { + return n.rateFn(n.namespaceName) +} + +func (n *NamespaceRateBurstImpl) Burst() int { + return n.burstFn(n.namespaceName) +} + +func NewOperatorRateBurst( + baseRateBurstFn RateBurst, + operatorRateRatio func() float64, +) *OperatorRateBurstImpl { + return &OperatorRateBurstImpl{ + operatorRateRatio: operatorRateRatio, + baseRateBurstFn: baseRateBurstFn, + } +} + +func (c *OperatorRateBurstImpl) Rate() float64 { + return c.operatorRateRatio() * c.baseRateBurstFn.Rate() +} + +func (c *OperatorRateBurstImpl) Burst() int { + return c.baseRateBurstFn.Burst() +} diff --git a/service/frontend/configs/quotas.go b/service/frontend/configs/quotas.go index bef6821cbc..25947ddfd8 100644 --- a/service/frontend/configs/quotas.go +++ b/service/frontend/configs/quotas.go @@ -1,7 +1,6 @@ package configs import ( - "math" "time" "go.temporal.io/server/common/dynamicconfig" @@ -11,11 +10,6 @@ import ( "go.temporal.io/server/common/quotas/calculator" ) -const ( - // OperatorPriority is used to give precedence to calls coming from web UI or tctl - OperatorPriority = 0 -) - const ( // These names do not map to an underlying gRPC service. This format is used for consistency with the // gRPC API names on which the authorizer - the consumer of this string - may depend. @@ -255,62 +249,6 @@ var ( } ) -type ( - NamespaceRateBurstImpl struct { - namespaceName string - rateFn quotas.NamespaceRateFn - burstFn dynamicconfig.IntPropertyFnWithNamespaceFilter - } - - operatorRateBurstImpl struct { - operatorRateRatio dynamicconfig.FloatPropertyFn - baseRateBurstFn quotas.RateBurst - } -) - -var _ quotas.RateBurst = (*NamespaceRateBurstImpl)(nil) -var _ quotas.RateBurst = (*operatorRateBurstImpl)(nil) - -func NewNamespaceRateBurst( - namespaceName string, - rateFn quotas.NamespaceRateFn, - burstRatioFn dynamicconfig.FloatPropertyFnWithNamespaceFilter, -) *NamespaceRateBurstImpl { - return &NamespaceRateBurstImpl{ - namespaceName: namespaceName, - rateFn: rateFn, - burstFn: func(namespace string) int { - return max(1, int(math.Ceil(rateFn(namespace)*burstRatioFn(namespace)))) - }, - } -} - -func (c *NamespaceRateBurstImpl) Rate() float64 { - return c.rateFn(c.namespaceName) -} - -func (c *NamespaceRateBurstImpl) Burst() int { - return c.burstFn(c.namespaceName) -} - -func newOperatorRateBurst( - baseRateBurstFn quotas.RateBurst, - operatorRateRatio dynamicconfig.FloatPropertyFn, -) *operatorRateBurstImpl { - return &operatorRateBurstImpl{ - operatorRateRatio: operatorRateRatio, - baseRateBurstFn: baseRateBurstFn, - } -} - -func (c *operatorRateBurstImpl) Rate() float64 { - return c.operatorRateRatio() * c.baseRateBurstFn.Rate() -} - -func (c *operatorRateBurstImpl) Burst() int { - return c.baseRateBurstFn.Burst() -} - func NewRequestToRateLimiter( executionRateBurstFn quotas.RateBurst, visibilityRateBurstFn quotas.RateBurst, @@ -340,69 +278,60 @@ func NewExecutionPriorityRateLimiter( rateBurstFn quotas.RateBurst, operatorRPSRatio dynamicconfig.FloatPropertyFn, ) quotas.RequestRateLimiter { - rateLimiters := make(map[int]quotas.RequestRateLimiter) - for priority := range ExecutionAPIPrioritiesOrdered { - if priority == OperatorPriority { - rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDynamicRateLimiter(newOperatorRateBurst(rateBurstFn, operatorRPSRatio), time.Minute)) - } else { - rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDynamicRateLimiter(rateBurstFn, time.Minute)) - } - } - return quotas.NewPriorityRateLimiter(func(req quotas.Request) int { - if req.CallerType == headers.CallerTypeOperator { - return OperatorPriority - } - if priority, ok := APIToPriority[req.API]; ok { - return priority - } - return ExecutionAPIPrioritiesOrdered[len(ExecutionAPIPrioritiesOrdered)-1] - }, rateLimiters) + return quotas.NewPriorityRateLimiterHelper( + rateBurstFn, + operatorRPSRatio, + func(req quotas.Request) int { + if req.CallerType == headers.CallerTypeOperator { + return quotas.OperatorPriority + } + if priority, ok := APIToPriority[req.API]; ok { + return priority + } + return ExecutionAPIPrioritiesOrdered[len(ExecutionAPIPrioritiesOrdered)-1] + }, + ExecutionAPIPrioritiesOrdered, + ) } func NewVisibilityPriorityRateLimiter( rateBurstFn quotas.RateBurst, operatorRPSRatio dynamicconfig.FloatPropertyFn, ) quotas.RequestRateLimiter { - rateLimiters := make(map[int]quotas.RequestRateLimiter) - for priority := range VisibilityAPIPrioritiesOrdered { - if priority == OperatorPriority { - rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDynamicRateLimiter(newOperatorRateBurst(rateBurstFn, operatorRPSRatio), time.Minute)) - } else { - rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDynamicRateLimiter(rateBurstFn, time.Minute)) - } - } - return quotas.NewPriorityRateLimiter(func(req quotas.Request) int { - if req.CallerType == headers.CallerTypeOperator { - return OperatorPriority - } - if priority, ok := VisibilityAPIToPriority[req.API]; ok { - return priority - } - return VisibilityAPIPrioritiesOrdered[len(VisibilityAPIPrioritiesOrdered)-1] - }, rateLimiters) + return quotas.NewPriorityRateLimiterHelper( + rateBurstFn, + operatorRPSRatio, + func(req quotas.Request) int { + if req.CallerType == headers.CallerTypeOperator { + return quotas.OperatorPriority + } + if priority, ok := VisibilityAPIToPriority[req.API]; ok { + return priority + } + return VisibilityAPIPrioritiesOrdered[len(VisibilityAPIPrioritiesOrdered)-1] + }, + VisibilityAPIPrioritiesOrdered, + ) } func NewNamespaceReplicationInducingAPIPriorityRateLimiter( rateBurstFn quotas.RateBurst, operatorRPSRatio dynamicconfig.FloatPropertyFn, ) quotas.RequestRateLimiter { - rateLimiters := make(map[int]quotas.RequestRateLimiter) - for priority := range NamespaceReplicationInducingAPIPrioritiesOrdered { - if priority == OperatorPriority { - rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDynamicRateLimiter(newOperatorRateBurst(rateBurstFn, operatorRPSRatio), time.Minute)) - } else { - rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDynamicRateLimiter(rateBurstFn, time.Minute)) - } - } - return quotas.NewPriorityRateLimiter(func(req quotas.Request) int { - if req.CallerType == headers.CallerTypeOperator { - return OperatorPriority - } - if priority, ok := NamespaceReplicationInducingAPIToPriority[req.API]; ok { - return priority - } - return NamespaceReplicationInducingAPIPrioritiesOrdered[len(NamespaceReplicationInducingAPIPrioritiesOrdered)-1] - }, rateLimiters) + return quotas.NewPriorityRateLimiterHelper( + rateBurstFn, + operatorRPSRatio, + func(req quotas.Request) int { + if req.CallerType == headers.CallerTypeOperator { + return quotas.OperatorPriority + } + if priority, ok := NamespaceReplicationInducingAPIToPriority[req.API]; ok { + return priority + } + return NamespaceReplicationInducingAPIPrioritiesOrdered[len(NamespaceReplicationInducingAPIPrioritiesOrdered)-1] + }, + NamespaceReplicationInducingAPIPrioritiesOrdered, + ) } // NewGlobalNamespaceRateLimiter creates a namespace-aware rate limiter that uses diff --git a/service/frontend/fx.go b/service/frontend/fx.go index e18524006f..e67ab198d4 100644 --- a/service/frontend/fx.go +++ b/service/frontend/fx.go @@ -95,7 +95,7 @@ var Module = fx.Options( fx.Provide(VisibilityManagerProvider), fx.Provide(ThrottledLoggerRpsFnProvider), fx.Provide(PersistenceRateLimitingParamsProvider), - service.PersistenceLazyLoadedServiceResolverModule, + service.LazyLoadedServiceResolverModule, fx.Provide(FEReplicatorNamespaceReplicationQueueProvider), fx.Provide(nsreplication.NewNoopDataMerger), fx.Provide(AuthorizationInterceptorProvider), @@ -529,9 +529,21 @@ func NamespaceRateLimitInterceptorProvider( namespaceRateLimiter := quotas.NewNamespaceRequestRateLimiter( func(req quotas.Request) quotas.RequestRateLimiter { return configs.NewRequestToRateLimiter( - configs.NewNamespaceRateBurst(req.Caller, namespaceRateFn, serviceConfig.MaxNamespaceBurstRatioPerInstance), - configs.NewNamespaceRateBurst(req.Caller, visibilityRateFn, serviceConfig.MaxNamespaceVisibilityBurstRatioPerInstance), - configs.NewNamespaceRateBurst(req.Caller, namespaceReplicationInducingRateFn, serviceConfig.MaxNamespaceNamespaceReplicationInducingAPIsBurstRatioPerInstance), + quotas.NewNamespaceRateBurst( + req.Caller, + namespaceRateFn, + quotas.NamespaceBurstRatioFn(serviceConfig.MaxNamespaceBurstRatioPerInstance), + ), + quotas.NewNamespaceRateBurst( + req.Caller, + visibilityRateFn, + quotas.NamespaceBurstRatioFn(serviceConfig.MaxNamespaceVisibilityBurstRatioPerInstance), + ), + quotas.NewNamespaceRateBurst( + req.Caller, + namespaceReplicationInducingRateFn, + quotas.NamespaceBurstRatioFn(serviceConfig.MaxNamespaceNamespaceReplicationInducingAPIsBurstRatioPerInstance), + ), serviceConfig.OperatorRPSRatio, ) }, @@ -588,7 +600,7 @@ func SlowRequestLoggerInterceptorProvider( func PersistenceRateLimitingParamsProvider( serviceConfig *Config, - persistenceLazyLoadedServiceResolver service.PersistenceLazyLoadedServiceResolver, + persistenceLazyLoadedServiceResolver service.LazyLoadedServiceResolver, logger log.SnTaggedLogger, ) service.PersistenceRateLimitingParams { return service.NewPersistenceRateLimitingParams( diff --git a/service/fx.go b/service/fx.go index 9f180a2bb8..75988e0da6 100644 --- a/service/fx.go +++ b/service/fx.go @@ -20,7 +20,7 @@ import ( ) type ( - PersistenceLazyLoadedServiceResolver struct { + LazyLoadedServiceResolver struct { *atomic.Value // value type is membership.ServiceResolver } @@ -38,38 +38,39 @@ type ( GrpcServerOptionsParams struct { fx.In - Logger log.Logger - RPCFactory common.RPCFactory - RetryableInterceptor *interceptor.RetryableInterceptor - TelemetryInterceptor *interceptor.TelemetryInterceptor - RateLimitInterceptor *interceptor.RateLimitInterceptor - TracingStatsHandler telemetry.ServerStatsHandler - MetricsStatsHandler metrics.ServerStatsHandler - AdditionalInterceptors []grpc.UnaryServerInterceptor `optional:"true"` - AdditionalStreamInterceptors []grpc.StreamServerInterceptor `optional:"true"` + Logger log.Logger + RPCFactory common.RPCFactory + RetryableInterceptor *interceptor.RetryableInterceptor + TelemetryInterceptor *interceptor.TelemetryInterceptor + NamespaceRateLimitInterceptor interceptor.NamespaceRateLimitInterceptor `optional:"true"` + RateLimitInterceptor *interceptor.RateLimitInterceptor + TracingStatsHandler telemetry.ServerStatsHandler + MetricsStatsHandler metrics.ServerStatsHandler + AdditionalInterceptors []grpc.UnaryServerInterceptor `optional:"true"` + AdditionalStreamInterceptors []grpc.StreamServerInterceptor `optional:"true"` } ) -var PersistenceLazyLoadedServiceResolverModule = fx.Options( - fx.Provide(func() PersistenceLazyLoadedServiceResolver { - return PersistenceLazyLoadedServiceResolver{ +var LazyLoadedServiceResolverModule = fx.Options( + fx.Provide(func() LazyLoadedServiceResolver { + return LazyLoadedServiceResolver{ Value: &atomic.Value{}, } }), - fx.Invoke(initPersistenceLazyLoadedServiceResolver), + fx.Invoke(initLazyLoadedServiceResolver), ) -func initPersistenceLazyLoadedServiceResolver( +func initLazyLoadedServiceResolver( serviceName primitives.ServiceName, logger log.SnTaggedLogger, serviceResolver membership.ServiceResolver, - lazyLoadedServiceResolver PersistenceLazyLoadedServiceResolver, + lazyLoadedServiceResolver LazyLoadedServiceResolver, ) { lazyLoadedServiceResolver.Store(serviceResolver) logger.Info("Initialized service resolver for persistence rate limiting", tag.Service(serviceName)) } -func (p PersistenceLazyLoadedServiceResolver) AvailableMemberCount() int { +func (p LazyLoadedServiceResolver) AvailableMemberCount() int { if value := p.Load(); value != nil { return value.(membership.ServiceResolver).AvailableMemberCount() } @@ -85,7 +86,7 @@ func NewPersistenceRateLimitingParams( operatorRPSRatio dynamicconfig.FloatPropertyFn, burstRatio dynamicconfig.FloatPropertyFn, dynamicRateLimitingParams dynamicconfig.TypedPropertyFn[dynamicconfig.DynamicRateLimitingParams], - lazyLoadedServiceResolver PersistenceLazyLoadedServiceResolver, + lazyLoadedServiceResolver LazyLoadedServiceResolver, logger log.Logger, ) PersistenceRateLimitingParams { hostCalculator := calculator.NewLoggedCalculator( @@ -163,6 +164,10 @@ func getUnaryInterceptors(params GrpcServerOptionsParams) []grpc.UnaryServerInte interceptors = append(interceptors, params.AdditionalInterceptors...) + if params.NamespaceRateLimitInterceptor != nil { + interceptors = append(interceptors, params.NamespaceRateLimitInterceptor.Intercept) + } + return append( interceptors, params.RateLimitInterceptor.Intercept, diff --git a/service/history/configs/config.go b/service/history/configs/config.go index c74f5630b3..d685c29263 100644 --- a/service/history/configs/config.go +++ b/service/history/configs/config.go @@ -16,6 +16,7 @@ type Config struct { HistoryReplicationDLQV2 dynamicconfig.BoolPropertyFn RPS dynamicconfig.IntPropertyFn + GlobalNamespaceRPS dynamicconfig.IntPropertyFnWithNamespaceFilter OperatorRPSRatio dynamicconfig.FloatPropertyFn MaxIDLengthLimit dynamicconfig.IntPropertyFn PersistenceMaxQPS dynamicconfig.IntPropertyFn @@ -435,6 +436,7 @@ func NewConfig( HistoryReplicationDLQV2: dynamicconfig.EnableHistoryReplicationDLQV2.Get(dc), RPS: dynamicconfig.HistoryRPS.Get(dc), + GlobalNamespaceRPS: dynamicconfig.HistoryGlobalNamespaceRPS.Get(dc), OperatorRPSRatio: dynamicconfig.OperatorRPSRatio.Get(dc), MaxIDLengthLimit: dynamicconfig.MaxIDLengthLimit.Get(dc), PersistenceMaxQPS: dynamicconfig.HistoryPersistenceMaxQPS.Get(dc), diff --git a/service/history/configs/quotas.go b/service/history/configs/quotas.go index e4c2fd8388..517918e34c 100644 --- a/service/history/configs/quotas.go +++ b/service/history/configs/quotas.go @@ -6,49 +6,56 @@ import ( "go.temporal.io/server/common/quotas" ) -const ( - // OperatorPriority is used to give precedence to calls coming from web UI or tctl - OperatorPriority = 0 -) - var ( CallerTypeToPriority = map[string]int{ - headers.CallerTypeOperator: OperatorPriority, + headers.CallerTypeOperator: quotas.OperatorPriority, headers.CallerTypeAPI: 1, headers.CallerTypeBackgroundHigh: 2, headers.CallerTypeBackgroundLow: 3, headers.CallerTypePreemptable: 4, } - APIPrioritiesOrdered = []int{OperatorPriority, 1, 2, 3, 4} + APIPrioritiesOrdered = []int{quotas.OperatorPriority, 1, 2, 3, 4} ) func NewPriorityRateLimiter( rateFn quotas.RateFn, operatorRPSRatio dynamicconfig.FloatPropertyFn, ) quotas.RequestRateLimiter { - rateLimiters := make(map[int]quotas.RequestRateLimiter) - for priority := range APIPrioritiesOrdered { - if priority == OperatorPriority { - rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultIncomingRateLimiter(operatorRateFn(rateFn, operatorRPSRatio))) - } else { - rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultIncomingRateLimiter(rateFn)) - } - } - return quotas.NewPriorityRateLimiter(func(req quotas.Request) int { - if priority, ok := CallerTypeToPriority[req.CallerType]; ok { - return priority - } - // unknown caller type, default to api to be consistent with existing behavior - return CallerTypeToPriority[headers.CallerTypeAPI] - }, rateLimiters) + return quotas.NewPriorityRateLimiterHelper( + quotas.NewDefaultIncomingRateBurst(rateFn), + operatorRPSRatio, + RequestToPriority, + APIPrioritiesOrdered, + ) } -func operatorRateFn( - rateFn quotas.RateFn, +func NewNamespapceRateLimiter( + namespaceRateFn quotas.NamespaceRateFn, operatorRPSRatio dynamicconfig.FloatPropertyFn, -) quotas.RateFn { - return func() float64 { - return operatorRPSRatio() * rateFn() +) quotas.RequestRateLimiter { + return quotas.NewNamespaceRequestRateLimiter( + func(req quotas.Request) quotas.RequestRateLimiter { + return quotas.NewPriorityRateLimiterHelper( + quotas.NewNamespaceRateBurst( + req.Caller, + namespaceRateFn, + // TODO: We can consider adding a separate burst ratio dynamic config + // on namespace level rate limiter if needed. + quotas.DefaultIncomingNamespaceBurstRatioFn, + ), + operatorRPSRatio, + RequestToPriority, + APIPrioritiesOrdered, + ) + }, + ) +} + +func RequestToPriority(req quotas.Request) int { + if priority, ok := CallerTypeToPriority[req.CallerType]; ok { + return priority } + // unknown caller type, default to api to be consistent with existing behavior + return CallerTypeToPriority[headers.CallerTypeAPI] } diff --git a/service/history/fx.go b/service/history/fx.go index 7f57e2ab90..73eeb8c979 100644 --- a/service/history/fx.go +++ b/service/history/fx.go @@ -65,6 +65,7 @@ var Module = fx.Options( fx.Provide(RetryableInterceptorProvider), fx.Provide(ErrorHandlerProvider), fx.Provide(TelemetryInterceptorProvider), + fx.Provide(NamespaceRateLimitInterceptorProvider), fx.Provide(RateLimitInterceptorProvider), fx.Provide(HealthSignalAggregatorProvider), fx.Provide(HealthCheckInterceptorProvider), @@ -78,7 +79,7 @@ var Module = fx.Options( fx.Provide(visibility.ChasmVisibilityManagerProvider), fx.Provide(ThrottledLoggerRpsFnProvider), fx.Provide(PersistenceRateLimitingParamsProvider), - service.PersistenceLazyLoadedServiceResolverModule, + service.LazyLoadedServiceResolverModule, fx.Provide(ServiceResolverProvider), fx.Provide(EventNotifierProvider), fx.Provide(HistoryEngineFactoryProvider), @@ -262,6 +263,42 @@ func HistoryAdditionalInterceptorsProvider( } } +func NamespaceRateLimitInterceptorProvider( + serviceConfig *configs.Config, + namespaceRegistry namespace.Registry, + lazyLoadedServiceResolver service.LazyLoadedServiceResolver, + ownershipBasedQuotaScaler shard.LazyLoadedOwnershipBasedQuotaScaler, + metricsHandler metrics.Handler, + logger log.SnTaggedLogger, +) interceptor.NamespaceRateLimitInterceptor { + + namespaceRateFn := calculator.NewLoggedNamespaceCalculator( + shard.NewOwnershipAwareNamespaceQuotaCalculator( + ownershipBasedQuotaScaler, + lazyLoadedServiceResolver, + // TODO: This fallbacks to host level rps when GlobalNamespaceRPS is not configured (i.e. 0), + // or shard ownership information is not available. + // We can consider adding a separate per namespace per instance rps dynamic config + // for this fallback behavior if needed. + func(_ string) int { return serviceConfig.RPS() }, + serviceConfig.GlobalNamespaceRPS, + ), + logger, + ).GetQuota + + return interceptor.NewNamespaceRateLimitInterceptor( + namespaceRegistry, + configs.NewNamespapceRateLimiter( + namespaceRateFn, + serviceConfig.OperatorRPSRatio, + ), + map[string]int{}, // no token overrides + map[string]struct{}{}, // no long poll methods + dynamicconfig.GetBoolPropertyFnFilteredByNamespace(false), // no long poll methods + metricsHandler, + ) +} + func RateLimitInterceptorProvider( serviceConfig *configs.Config, ) *interceptor.RateLimitInterceptor { @@ -289,14 +326,14 @@ func ESProcessorConfigProvider( func PersistenceRateLimitingParamsProvider( serviceConfig *configs.Config, - persistenceLazyLoadedServiceResolver service.PersistenceLazyLoadedServiceResolver, + lazyLoadedServiceResolver service.LazyLoadedServiceResolver, ownershipBasedQuotaScaler shard.LazyLoadedOwnershipBasedQuotaScaler, logger log.SnTaggedLogger, ) service.PersistenceRateLimitingParams { hostCalculator := calculator.NewLoggedCalculator( shard.NewOwnershipAwareQuotaCalculator( ownershipBasedQuotaScaler, - persistenceLazyLoadedServiceResolver, + lazyLoadedServiceResolver, serviceConfig.PersistenceMaxQPS, serviceConfig.PersistenceGlobalMaxQPS, ), @@ -305,7 +342,7 @@ func PersistenceRateLimitingParamsProvider( namespaceCalculator := calculator.NewLoggedNamespaceCalculator( shard.NewOwnershipAwareNamespaceQuotaCalculator( ownershipBasedQuotaScaler, - persistenceLazyLoadedServiceResolver, + lazyLoadedServiceResolver, serviceConfig.PersistenceNamespaceMaxQPS, serviceConfig.PersistenceGlobalNamespaceMaxQPS, ), diff --git a/service/matching/fx.go b/service/matching/fx.go index 17473f7b27..c237259cb9 100644 --- a/service/matching/fx.go +++ b/service/matching/fx.go @@ -33,7 +33,7 @@ var Module = fx.Options( workerdeployment.Module, fx.Provide(ConfigProvider), fx.Provide(PersistenceRateLimitingParamsProvider), - service.PersistenceLazyLoadedServiceResolverModule, + service.LazyLoadedServiceResolverModule, fx.Provide(ThrottledLoggerRpsFnProvider), fx.Provide(RetryableInterceptorProvider), fx.Provide(ErrorHandlerProvider), @@ -113,7 +113,7 @@ func RateLimitInterceptorProvider( // if-case comes from resourceImpl.New. func PersistenceRateLimitingParamsProvider( serviceConfig *Config, - persistenceLazyLoadedServiceResolver service.PersistenceLazyLoadedServiceResolver, + persistenceLazyLoadedServiceResolver service.LazyLoadedServiceResolver, logger log.SnTaggedLogger, ) service.PersistenceRateLimitingParams { return service.NewPersistenceRateLimitingParams( diff --git a/service/worker/fx.go b/service/worker/fx.go index 1231e11918..b6acd06646 100644 --- a/service/worker/fx.go +++ b/service/worker/fx.go @@ -79,7 +79,7 @@ var Module = fx.Options( fx.Provide(ThrottledLoggerRpsFnProvider), fx.Provide(ConfigProvider), fx.Provide(PersistenceRateLimitingParamsProvider), - service.PersistenceLazyLoadedServiceResolverModule, + service.LazyLoadedServiceResolverModule, fx.Provide(ServiceResolverProvider), fx.Provide(func( clusterMetadata cluster.Metadata, @@ -108,7 +108,7 @@ func ThrottledLoggerRpsFnProvider(serviceConfig *Config) resource.ThrottledLogge func PersistenceRateLimitingParamsProvider( serviceConfig *Config, - persistenceLazyLoadedServiceResolver service.PersistenceLazyLoadedServiceResolver, + persistenceLazyLoadedServiceResolver service.LazyLoadedServiceResolver, logger log.SnTaggedLogger, ) service.PersistenceRateLimitingParams { return service.NewPersistenceRateLimitingParams(