From 322cf362fe70fb51cda3e0cf75eeab38d627f14b Mon Sep 17 00:00:00 2001 From: vaycheslav Date: Fri, 24 Oct 2025 00:45:35 +0300 Subject: [PATCH 1/7] feature: added rate-limiter in api controller for service backpressure --- .../ru/quipy/apigateway/APIController.kt | 11 ++++- .../common/utils/LeakingBucketRateLimiter.kt | 2 +- .../ru/quipy/config/RpcControlConfig.kt | 18 +++++++-- .../ru/quipy/payments/logic/OrderPayer.kt | 40 +++++++++++++------ 4 files changed, 53 insertions(+), 18 deletions(-) diff --git a/src/main/kotlin/ru/quipy/apigateway/APIController.kt b/src/main/kotlin/ru/quipy/apigateway/APIController.kt index 251b8cdbc..fdef849b9 100644 --- a/src/main/kotlin/ru/quipy/apigateway/APIController.kt +++ b/src/main/kotlin/ru/quipy/apigateway/APIController.kt @@ -3,12 +3,18 @@ package ru.quipy.apigateway import org.slf4j.Logger import org.slf4j.LoggerFactory import org.springframework.web.bind.annotation.* +import ru.quipy.common.utils.RateLimiter +import ru.quipy.exceptions.TooManyRequestsException import ru.quipy.orders.repository.OrderRepository import ru.quipy.payments.logic.OrderPayer import java.util.* @RestController -class APIController(private val orderRepository: OrderRepository, private val orderPayer: OrderPayer) { +class APIController( + private val orderRepository: OrderRepository, + private val orderPayer: OrderPayer, + private val rateLimiter: RateLimiter +) { val logger: Logger = LoggerFactory.getLogger(APIController::class.java) @@ -49,6 +55,9 @@ class APIController(private val orderRepository: OrderRepository, private val or @PostMapping("/orders/{orderId}/payment") fun payOrder(@PathVariable orderId: UUID, @RequestParam deadline: Long): PaymentSubmissionDto { + if (!rateLimiter.tick()) { + throw TooManyRequestsException() + } val paymentId = UUID.randomUUID() val order = orderRepository.findById(orderId)?.let { orderRepository.save(it.copy(status = OrderStatus.PAYMENT_IN_PROGRESS)) diff --git a/src/main/kotlin/ru/quipy/common/utils/LeakingBucketRateLimiter.kt b/src/main/kotlin/ru/quipy/common/utils/LeakingBucketRateLimiter.kt index 383c047f7..88db1eb39 100644 --- a/src/main/kotlin/ru/quipy/common/utils/LeakingBucketRateLimiter.kt +++ b/src/main/kotlin/ru/quipy/common/utils/LeakingBucketRateLimiter.kt @@ -25,7 +25,7 @@ class LeakingBucketRateLimiter( private val releaseJob = rateLimiterScope.launch { while (true) { delay(window.toMillis()) - for (i in 0..rate) { + for (i in 1..rate) { queue.poll() } } diff --git a/src/main/kotlin/ru/quipy/config/RpcControlConfig.kt b/src/main/kotlin/ru/quipy/config/RpcControlConfig.kt index bd37c79e2..f0d828557 100644 --- a/src/main/kotlin/ru/quipy/config/RpcControlConfig.kt +++ b/src/main/kotlin/ru/quipy/config/RpcControlConfig.kt @@ -3,7 +3,10 @@ package ru.quipy.config import org.springframework.beans.factory.annotation.Qualifier import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration -import ru.quipy.common.utils.SlidingWindowRateLimiter +import ru.quipy.common.utils.CompositeRateLimiter +import ru.quipy.common.utils.LeakingBucketRateLimiter +import ru.quipy.common.utils.RateLimiter +import ru.quipy.common.utils.TokenBucketRateLimiter import ru.quipy.payments.logic.PaymentAccountProperties import java.time.Duration import java.util.concurrent.Semaphore @@ -12,9 +15,16 @@ import java.util.concurrent.Semaphore class RpcControlConfig { @Bean - fun getSlidingWindowRateLimiter(accountProperties: PaymentAccountProperties) = - SlidingWindowRateLimiter(accountProperties.rateLimitPerSec.toLong(), - Duration.ofSeconds(1) + fun getRateLimiter(accountProperties: PaymentAccountProperties): RateLimiter = + CompositeRateLimiter( + TokenBucketRateLimiter( + accountProperties.rateLimitPerSec, accountProperties.parallelRequests, + Duration.ofSeconds(1).toMillis() + ), LeakingBucketRateLimiter( + accountProperties.rateLimitPerSec.toLong(), + Duration.ofMillis(1), + accountProperties.parallelRequests + ) ) @Bean diff --git a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt index 2e8320429..d99316af4 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt @@ -6,6 +6,7 @@ import org.slf4j.Logger import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Qualifier import org.springframework.stereotype.Service +import ru.quipy.common.utils.CallerBlockingRejectedExecutionHandler import ru.quipy.common.utils.NamedThreadFactory import ru.quipy.common.utils.SlidingWindowRateLimiter import ru.quipy.core.EventSourcingService @@ -14,7 +15,10 @@ import ru.quipy.payments.api.PaymentAggregate import ru.quipy.payments.dto.Transaction import java.time.Duration import java.util.* -import java.util.concurrent.* +import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.Semaphore +import java.util.concurrent.ThreadPoolExecutor +import java.util.concurrent.TimeUnit @Service class OrderPayer( @@ -43,7 +47,7 @@ class OrderPayer( TimeUnit.MILLISECONDS, ArrayBlockingQueue(accountProperties.parallelRequests), NamedThreadFactory("payment-submission-executor"), - ThreadPoolExecutor.AbortPolicy() + CallerBlockingRejectedExecutionHandler() ) } @@ -60,8 +64,9 @@ class OrderPayer( val task = Runnable { parallelLimiter.acquire() - while (!rateLimit.tick()) { - Thread.sleep(Random().nextInt(0, 10).toLong()) + if (!rateLimit.tick()) { + parallelLimiter.release() + throw TooManyRequestsException() } paymentProcessingStartedCounter.increment() try { @@ -82,12 +87,23 @@ class OrderPayer( } val transaction = Transaction(orderId, amount, paymentId, deadline, task) - - try { - paymentExecutor.execute(transaction) - return createdAt - } catch (_: RejectedExecutionException) { - throw TooManyRequestsException() - } + paymentExecutor.execute(transaction) + return createdAt } -} \ No newline at end of file +} + + +/** + * "serviceName": "cas-m3404", + * "accountName": "acc-23", + * "parallelRequests": 64, + * "rateLimitPerSec": 11, + * "price": 30, + * "averageProcessingTime": "PT1S" + + * "accounts": "acc-23", + * "ratePerSecond": 15, + * "testCount": 3000, + * "processingTimeMillis": 2500 + * } + */ \ No newline at end of file From 930f449301f75582482f5f0c7ede767e7e096237 Mon Sep 17 00:00:00 2001 From: vaycheslav Date: Fri, 24 Oct 2025 01:20:49 +0300 Subject: [PATCH 2/7] feature: added twice in second checks --- .../ru/quipy/apigateway/GlobalExceptionHandler.kt | 3 ++- src/main/kotlin/ru/quipy/config/RpcControlConfig.kt | 13 +++++-------- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/src/main/kotlin/ru/quipy/apigateway/GlobalExceptionHandler.kt b/src/main/kotlin/ru/quipy/apigateway/GlobalExceptionHandler.kt index e32911ade..cea35a33e 100644 --- a/src/main/kotlin/ru/quipy/apigateway/GlobalExceptionHandler.kt +++ b/src/main/kotlin/ru/quipy/apigateway/GlobalExceptionHandler.kt @@ -9,7 +9,7 @@ import ru.quipy.exceptions.TooManyRequestsException @RestControllerAdvice class GlobalExceptionHandler( - private val maxWait: String = "3", + private val maxWait: String = "4", ) { companion object { val logger = LoggerFactory.getLogger(GlobalExceptionHandler::class.java) @@ -17,6 +17,7 @@ class GlobalExceptionHandler( @ExceptionHandler(TooManyRequestsException::class) fun handleTooManyRequests(): ResponseEntity { + logger.warn("Too many requests - returning 429") return ResponseEntity .status(HttpStatus.TOO_MANY_REQUESTS) .header("Retry-After", maxWait) diff --git a/src/main/kotlin/ru/quipy/config/RpcControlConfig.kt b/src/main/kotlin/ru/quipy/config/RpcControlConfig.kt index f0d828557..ea0f446cf 100644 --- a/src/main/kotlin/ru/quipy/config/RpcControlConfig.kt +++ b/src/main/kotlin/ru/quipy/config/RpcControlConfig.kt @@ -6,26 +6,23 @@ import org.springframework.context.annotation.Configuration import ru.quipy.common.utils.CompositeRateLimiter import ru.quipy.common.utils.LeakingBucketRateLimiter import ru.quipy.common.utils.RateLimiter +import ru.quipy.common.utils.SlidingWindowRateLimiter import ru.quipy.common.utils.TokenBucketRateLimiter import ru.quipy.payments.logic.PaymentAccountProperties import java.time.Duration import java.util.concurrent.Semaphore +import java.util.concurrent.TimeUnit @Configuration class RpcControlConfig { @Bean fun getRateLimiter(accountProperties: PaymentAccountProperties): RateLimiter = - CompositeRateLimiter( - TokenBucketRateLimiter( - accountProperties.rateLimitPerSec, accountProperties.parallelRequests, - Duration.ofSeconds(1).toMillis() - ), LeakingBucketRateLimiter( + CompositeRateLimiter(SlidingWindowRateLimiter(accountProperties.rateLimitPerSec.toLong()/2, Duration.ofMillis(500)), LeakingBucketRateLimiter( accountProperties.rateLimitPerSec.toLong(), Duration.ofMillis(1), - accountProperties.parallelRequests - ) - ) + accountProperties.rateLimitPerSec )) + @Bean @Qualifier("parallelLimiter") From dc188d7b9a1dbebdb60028085e739881ab77965e Mon Sep 17 00:00:00 2001 From: vaycheslav Date: Fri, 24 Oct 2025 14:43:04 +0300 Subject: [PATCH 3/7] fix: fixed rl type and parameters --- .../apigateway/GlobalExceptionHandler.kt | 4 ++-- .../common/utils/LeakingBucketRateLimiter.kt | 2 ++ .../ru/quipy/config/RpcControlConfig.kt | 7 +----- .../ru/quipy/payments/logic/OrderPayer.kt | 23 +++++++++---------- 4 files changed, 16 insertions(+), 20 deletions(-) diff --git a/src/main/kotlin/ru/quipy/apigateway/GlobalExceptionHandler.kt b/src/main/kotlin/ru/quipy/apigateway/GlobalExceptionHandler.kt index cea35a33e..9346eaa32 100644 --- a/src/main/kotlin/ru/quipy/apigateway/GlobalExceptionHandler.kt +++ b/src/main/kotlin/ru/quipy/apigateway/GlobalExceptionHandler.kt @@ -6,10 +6,10 @@ import org.springframework.http.ResponseEntity import org.springframework.web.bind.annotation.ExceptionHandler import org.springframework.web.bind.annotation.RestControllerAdvice import ru.quipy.exceptions.TooManyRequestsException +import kotlin.random.Random @RestControllerAdvice class GlobalExceptionHandler( - private val maxWait: String = "4", ) { companion object { val logger = LoggerFactory.getLogger(GlobalExceptionHandler::class.java) @@ -20,7 +20,7 @@ class GlobalExceptionHandler( logger.warn("Too many requests - returning 429") return ResponseEntity .status(HttpStatus.TOO_MANY_REQUESTS) - .header("Retry-After", maxWait) + .header("Retry-After", "${Random.nextInt(30000, 40000)}") .build() } } \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/common/utils/LeakingBucketRateLimiter.kt b/src/main/kotlin/ru/quipy/common/utils/LeakingBucketRateLimiter.kt index 88db1eb39..94131a0ee 100644 --- a/src/main/kotlin/ru/quipy/common/utils/LeakingBucketRateLimiter.kt +++ b/src/main/kotlin/ru/quipy/common/utils/LeakingBucketRateLimiter.kt @@ -19,6 +19,7 @@ class LeakingBucketRateLimiter( private val queue = LinkedBlockingQueue(bucketSize) override fun tick(): Boolean { + logger.info("RateLimiter: queue size before offer: ${queue.size}") return queue.offer(1) } @@ -28,6 +29,7 @@ class LeakingBucketRateLimiter( for (i in 1..rate) { queue.poll() } + logger.info("RateLimiter: queue size after cleaning: ${queue.size}") } }.invokeOnCompletion { th -> if (th != null) logger.error("Rate limiter release job completed", th) } diff --git a/src/main/kotlin/ru/quipy/config/RpcControlConfig.kt b/src/main/kotlin/ru/quipy/config/RpcControlConfig.kt index ea0f446cf..e2186a315 100644 --- a/src/main/kotlin/ru/quipy/config/RpcControlConfig.kt +++ b/src/main/kotlin/ru/quipy/config/RpcControlConfig.kt @@ -9,7 +9,6 @@ import ru.quipy.common.utils.RateLimiter import ru.quipy.common.utils.SlidingWindowRateLimiter import ru.quipy.common.utils.TokenBucketRateLimiter import ru.quipy.payments.logic.PaymentAccountProperties -import java.time.Duration import java.util.concurrent.Semaphore import java.util.concurrent.TimeUnit @@ -18,11 +17,7 @@ class RpcControlConfig { @Bean fun getRateLimiter(accountProperties: PaymentAccountProperties): RateLimiter = - CompositeRateLimiter(SlidingWindowRateLimiter(accountProperties.rateLimitPerSec.toLong()/2, Duration.ofMillis(500)), LeakingBucketRateLimiter( - accountProperties.rateLimitPerSec.toLong(), - Duration.ofMillis(1), - accountProperties.rateLimitPerSec )) - + TokenBucketRateLimiter(6, 11, 500, TimeUnit.MILLISECONDS ) @Bean @Qualifier("parallelLimiter") diff --git a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt index d99316af4..3396a307c 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt @@ -6,7 +6,6 @@ import org.slf4j.Logger import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Qualifier import org.springframework.stereotype.Service -import ru.quipy.common.utils.CallerBlockingRejectedExecutionHandler import ru.quipy.common.utils.NamedThreadFactory import ru.quipy.common.utils.SlidingWindowRateLimiter import ru.quipy.core.EventSourcingService @@ -15,10 +14,7 @@ import ru.quipy.payments.api.PaymentAggregate import ru.quipy.payments.dto.Transaction import java.time.Duration import java.util.* -import java.util.concurrent.ArrayBlockingQueue -import java.util.concurrent.Semaphore -import java.util.concurrent.ThreadPoolExecutor -import java.util.concurrent.TimeUnit +import java.util.concurrent.* @Service class OrderPayer( @@ -47,7 +43,7 @@ class OrderPayer( TimeUnit.MILLISECONDS, ArrayBlockingQueue(accountProperties.parallelRequests), NamedThreadFactory("payment-submission-executor"), - CallerBlockingRejectedExecutionHandler() + ThreadPoolExecutor.AbortPolicy() ) } @@ -64,9 +60,8 @@ class OrderPayer( val task = Runnable { parallelLimiter.acquire() - if (!rateLimit.tick()) { - parallelLimiter.release() - throw TooManyRequestsException() + while (!rateLimit.tick()) { + Thread.sleep(Random().nextInt(0, 10).toLong()) } paymentProcessingStartedCounter.increment() try { @@ -87,12 +82,16 @@ class OrderPayer( } val transaction = Transaction(orderId, amount, paymentId, deadline, task) - paymentExecutor.execute(transaction) - return createdAt + + try { + paymentExecutor.execute(transaction) + return createdAt + } catch (_: RejectedExecutionException) { + throw TooManyRequestsException() + } } } - /** * "serviceName": "cas-m3404", * "accountName": "acc-23", From 6e21a65e5bfb3066e4c0aeb08374941d7299eb0c Mon Sep 17 00:00:00 2001 From: vaycheslav Date: Fri, 24 Oct 2025 17:30:40 +0300 Subject: [PATCH 4/7] fix: fixed parameters --- src/main/kotlin/ru/quipy/config/RpcControlConfig.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/kotlin/ru/quipy/config/RpcControlConfig.kt b/src/main/kotlin/ru/quipy/config/RpcControlConfig.kt index e2186a315..25788e7cf 100644 --- a/src/main/kotlin/ru/quipy/config/RpcControlConfig.kt +++ b/src/main/kotlin/ru/quipy/config/RpcControlConfig.kt @@ -4,11 +4,11 @@ import org.springframework.beans.factory.annotation.Qualifier import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import ru.quipy.common.utils.CompositeRateLimiter -import ru.quipy.common.utils.LeakingBucketRateLimiter import ru.quipy.common.utils.RateLimiter import ru.quipy.common.utils.SlidingWindowRateLimiter import ru.quipy.common.utils.TokenBucketRateLimiter import ru.quipy.payments.logic.PaymentAccountProperties +import java.time.Duration import java.util.concurrent.Semaphore import java.util.concurrent.TimeUnit @@ -17,7 +17,7 @@ class RpcControlConfig { @Bean fun getRateLimiter(accountProperties: PaymentAccountProperties): RateLimiter = - TokenBucketRateLimiter(6, 11, 500, TimeUnit.MILLISECONDS ) + CompositeRateLimiter(SlidingWindowRateLimiter(accountProperties.rateLimitPerSec.toLong() * 30, Duration.ofSeconds(30)), TokenBucketRateLimiter(11, 120, 1000, TimeUnit.MILLISECONDS )) @Bean @Qualifier("parallelLimiter") From 9a50597f91cbebf4a1d2ae2fee14712522345565 Mon Sep 17 00:00:00 2001 From: vaycheslav Date: Fri, 31 Oct 2025 18:10:26 +0300 Subject: [PATCH 5/7] fix: fixed parameters --- src/main/kotlin/ru/quipy/apigateway/APIController.kt | 4 +++- .../ru/quipy/apigateway/GlobalExceptionHandler.kt | 2 +- .../ru/quipy/common/utils/LeakingBucketRateLimiter.kt | 2 -- src/main/kotlin/ru/quipy/config/RpcControlConfig.kt | 4 ---- src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt | 10 +++++----- 5 files changed, 9 insertions(+), 13 deletions(-) diff --git a/src/main/kotlin/ru/quipy/apigateway/APIController.kt b/src/main/kotlin/ru/quipy/apigateway/APIController.kt index fdef849b9..5c15d9a6c 100644 --- a/src/main/kotlin/ru/quipy/apigateway/APIController.kt +++ b/src/main/kotlin/ru/quipy/apigateway/APIController.kt @@ -3,17 +3,19 @@ package ru.quipy.apigateway import org.slf4j.Logger import org.slf4j.LoggerFactory import org.springframework.web.bind.annotation.* +import ru.quipy.common.utils.LeakingBucketRateLimiter import ru.quipy.common.utils.RateLimiter import ru.quipy.exceptions.TooManyRequestsException import ru.quipy.orders.repository.OrderRepository import ru.quipy.payments.logic.OrderPayer +import java.time.Duration import java.util.* @RestController class APIController( private val orderRepository: OrderRepository, private val orderPayer: OrderPayer, - private val rateLimiter: RateLimiter + private val rateLimiter: RateLimiter = LeakingBucketRateLimiter(11, Duration.ofSeconds(1), 120) ) { val logger: Logger = LoggerFactory.getLogger(APIController::class.java) diff --git a/src/main/kotlin/ru/quipy/apigateway/GlobalExceptionHandler.kt b/src/main/kotlin/ru/quipy/apigateway/GlobalExceptionHandler.kt index 9346eaa32..8cd569888 100644 --- a/src/main/kotlin/ru/quipy/apigateway/GlobalExceptionHandler.kt +++ b/src/main/kotlin/ru/quipy/apigateway/GlobalExceptionHandler.kt @@ -20,7 +20,7 @@ class GlobalExceptionHandler( logger.warn("Too many requests - returning 429") return ResponseEntity .status(HttpStatus.TOO_MANY_REQUESTS) - .header("Retry-After", "${Random.nextInt(30000, 40000)}") + .header("Retry-After", "10") .build() } } \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/common/utils/LeakingBucketRateLimiter.kt b/src/main/kotlin/ru/quipy/common/utils/LeakingBucketRateLimiter.kt index 94131a0ee..88db1eb39 100644 --- a/src/main/kotlin/ru/quipy/common/utils/LeakingBucketRateLimiter.kt +++ b/src/main/kotlin/ru/quipy/common/utils/LeakingBucketRateLimiter.kt @@ -19,7 +19,6 @@ class LeakingBucketRateLimiter( private val queue = LinkedBlockingQueue(bucketSize) override fun tick(): Boolean { - logger.info("RateLimiter: queue size before offer: ${queue.size}") return queue.offer(1) } @@ -29,7 +28,6 @@ class LeakingBucketRateLimiter( for (i in 1..rate) { queue.poll() } - logger.info("RateLimiter: queue size after cleaning: ${queue.size}") } }.invokeOnCompletion { th -> if (th != null) logger.error("Rate limiter release job completed", th) } diff --git a/src/main/kotlin/ru/quipy/config/RpcControlConfig.kt b/src/main/kotlin/ru/quipy/config/RpcControlConfig.kt index 25788e7cf..ec2747436 100644 --- a/src/main/kotlin/ru/quipy/config/RpcControlConfig.kt +++ b/src/main/kotlin/ru/quipy/config/RpcControlConfig.kt @@ -15,10 +15,6 @@ import java.util.concurrent.TimeUnit @Configuration class RpcControlConfig { - @Bean - fun getRateLimiter(accountProperties: PaymentAccountProperties): RateLimiter = - CompositeRateLimiter(SlidingWindowRateLimiter(accountProperties.rateLimitPerSec.toLong() * 30, Duration.ofSeconds(30)), TokenBucketRateLimiter(11, 120, 1000, TimeUnit.MILLISECONDS )) - @Bean @Qualifier("parallelLimiter") fun parallelLimiter(accountProperties: PaymentAccountProperties): Semaphore { diff --git a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt index 3396a307c..f3d0d3de7 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt @@ -50,19 +50,19 @@ class OrderPayer( private val rateLimit: SlidingWindowRateLimiter by lazy { SlidingWindowRateLimiter( rate = accountProperties.rateLimitPerSec.toLong(), - window = Duration.ofSeconds(1), + window = Duration.ofMillis(1000), ) } fun processPayment(orderId: UUID, amount: Int, paymentId: UUID, deadline: Long): Long { val createdAt = System.currentTimeMillis() paymentProcessingPlannedCounter.increment() + parallelLimiter.acquire() + while (!rateLimit.tick()) { + Thread.sleep(10) + } val task = Runnable { - parallelLimiter.acquire() - while (!rateLimit.tick()) { - Thread.sleep(Random().nextInt(0, 10).toLong()) - } paymentProcessingStartedCounter.increment() try { val createdEvent = paymentESService.create { From 6e785d684a0ea4927e62741bc52fcbfe60e57f67 Mon Sep 17 00:00:00 2001 From: vaycheslav Date: Fri, 31 Oct 2025 18:25:37 +0300 Subject: [PATCH 6/7] fix: fixed parameters --- src/main/kotlin/ru/quipy/apigateway/APIController.kt | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/main/kotlin/ru/quipy/apigateway/APIController.kt b/src/main/kotlin/ru/quipy/apigateway/APIController.kt index 5c15d9a6c..028a68e9a 100644 --- a/src/main/kotlin/ru/quipy/apigateway/APIController.kt +++ b/src/main/kotlin/ru/quipy/apigateway/APIController.kt @@ -5,6 +5,7 @@ import org.slf4j.LoggerFactory import org.springframework.web.bind.annotation.* import ru.quipy.common.utils.LeakingBucketRateLimiter import ru.quipy.common.utils.RateLimiter +import ru.quipy.common.utils.SlidingWindowRateLimiter import ru.quipy.exceptions.TooManyRequestsException import ru.quipy.orders.repository.OrderRepository import ru.quipy.payments.logic.OrderPayer @@ -15,7 +16,7 @@ import java.util.* class APIController( private val orderRepository: OrderRepository, private val orderPayer: OrderPayer, - private val rateLimiter: RateLimiter = LeakingBucketRateLimiter(11, Duration.ofSeconds(1), 120) + private val rateLimiter: RateLimiter = LeakingBucketRateLimiter(10, Duration.ofSeconds(1), 10) ) { val logger: Logger = LoggerFactory.getLogger(APIController::class.java) @@ -57,9 +58,6 @@ class APIController( @PostMapping("/orders/{orderId}/payment") fun payOrder(@PathVariable orderId: UUID, @RequestParam deadline: Long): PaymentSubmissionDto { - if (!rateLimiter.tick()) { - throw TooManyRequestsException() - } val paymentId = UUID.randomUUID() val order = orderRepository.findById(orderId)?.let { orderRepository.save(it.copy(status = OrderStatus.PAYMENT_IN_PROGRESS)) From 556f3d568de2871788e55507ec5ac62fad896e36 Mon Sep 17 00:00:00 2001 From: vaycheslav Date: Sat, 1 Nov 2025 02:42:39 +0300 Subject: [PATCH 7/7] feature: added retries for Temporary error --- .../ru/quipy/apigateway/APIController.kt | 11 ++++- .../apigateway/GlobalExceptionHandler.kt | 35 ++++++++++++--- .../exceptions/TooManyRequestsException.kt | 2 +- .../ru/quipy/payments/logic/OrderPayer.kt | 43 ++++++++----------- .../logic/PaymentExternalServiceImpl.kt | 19 +++++++- src/main/resources/application.properties | 2 +- 6 files changed, 75 insertions(+), 37 deletions(-) diff --git a/src/main/kotlin/ru/quipy/apigateway/APIController.kt b/src/main/kotlin/ru/quipy/apigateway/APIController.kt index 028a68e9a..1a92d7863 100644 --- a/src/main/kotlin/ru/quipy/apigateway/APIController.kt +++ b/src/main/kotlin/ru/quipy/apigateway/APIController.kt @@ -2,21 +2,25 @@ package ru.quipy.apigateway import org.slf4j.Logger import org.slf4j.LoggerFactory +import org.springframework.beans.factory.annotation.Qualifier import org.springframework.web.bind.annotation.* import ru.quipy.common.utils.LeakingBucketRateLimiter import ru.quipy.common.utils.RateLimiter -import ru.quipy.common.utils.SlidingWindowRateLimiter import ru.quipy.exceptions.TooManyRequestsException import ru.quipy.orders.repository.OrderRepository import ru.quipy.payments.logic.OrderPayer import java.time.Duration import java.util.* +import java.util.concurrent.Semaphore +import java.util.concurrent.TimeUnit @RestController class APIController( private val orderRepository: OrderRepository, private val orderPayer: OrderPayer, - private val rateLimiter: RateLimiter = LeakingBucketRateLimiter(10, Duration.ofSeconds(1), 10) + @field:Qualifier("parallelLimiter") + private val parallelLimiter: Semaphore, + private val rateLimiter: RateLimiter = LeakingBucketRateLimiter(10, Duration.ofSeconds(1), 20) ) { val logger: Logger = LoggerFactory.getLogger(APIController::class.java) @@ -58,6 +62,9 @@ class APIController( @PostMapping("/orders/{orderId}/payment") fun payOrder(@PathVariable orderId: UUID, @RequestParam deadline: Long): PaymentSubmissionDto { + if (!rateLimiter.tick() || !parallelLimiter.tryAcquire(1, TimeUnit.SECONDS)) { + throw TooManyRequestsException(deadline) + } val paymentId = UUID.randomUUID() val order = orderRepository.findById(orderId)?.let { orderRepository.save(it.copy(status = OrderStatus.PAYMENT_IN_PROGRESS)) diff --git a/src/main/kotlin/ru/quipy/apigateway/GlobalExceptionHandler.kt b/src/main/kotlin/ru/quipy/apigateway/GlobalExceptionHandler.kt index 8cd569888..1fede48bf 100644 --- a/src/main/kotlin/ru/quipy/apigateway/GlobalExceptionHandler.kt +++ b/src/main/kotlin/ru/quipy/apigateway/GlobalExceptionHandler.kt @@ -6,21 +6,42 @@ import org.springframework.http.ResponseEntity import org.springframework.web.bind.annotation.ExceptionHandler import org.springframework.web.bind.annotation.RestControllerAdvice import ru.quipy.exceptions.TooManyRequestsException -import kotlin.random.Random +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicLong @RestControllerAdvice class GlobalExceptionHandler( ) { companion object { val logger = LoggerFactory.getLogger(GlobalExceptionHandler::class.java) + private var currentRetryAfterSeconds = 1 + private var exp_base = 2; + private const val MAX_RETRY_AFTER_MS = 256 + private const val RESET_WINDOW_MS = 30000L } + private val rejectedRequestsCount = AtomicInteger(0) + private val lastRejectionTime = AtomicLong(0) + @ExceptionHandler(TooManyRequestsException::class) - fun handleTooManyRequests(): ResponseEntity { - logger.warn("Too many requests - returning 429") - return ResponseEntity - .status(HttpStatus.TOO_MANY_REQUESTS) - .header("Retry-After", "10") - .build() + fun handleTooManyRequests(exception: TooManyRequestsException): ResponseEntity { + logger.warn("to many request") + val currentTime = System.currentTimeMillis() + val lastRejection = lastRejectionTime.get() + + if (currentTime - lastRejection > RESET_WINDOW_MS) { + rejectedRequestsCount.set(0) + currentRetryAfterSeconds = 1 + } + lastRejectionTime.set(currentTime) + + if (rejectedRequestsCount.get() < 30 && exception.deadline < System.currentTimeMillis() + 1200) { + return ResponseEntity + .status(HttpStatus.TOO_MANY_REQUESTS) + .header("Retry-After", MAX_RETRY_AFTER_MS.coerceAtLeast(currentRetryAfterSeconds).toString()) + .build() + } + + return ResponseEntity.status(200).build() } } \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/exceptions/TooManyRequestsException.kt b/src/main/kotlin/ru/quipy/exceptions/TooManyRequestsException.kt index 8d887d9f7..cacd67810 100644 --- a/src/main/kotlin/ru/quipy/exceptions/TooManyRequestsException.kt +++ b/src/main/kotlin/ru/quipy/exceptions/TooManyRequestsException.kt @@ -1,3 +1,3 @@ package ru.quipy.exceptions -class TooManyRequestsException() : RuntimeException() \ No newline at end of file +class TooManyRequestsException(val deadline: Long) : RuntimeException() \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt index f3d0d3de7..fe035cd6a 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt @@ -6,15 +6,17 @@ import org.slf4j.Logger import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Qualifier import org.springframework.stereotype.Service +import ru.quipy.common.utils.CallerBlockingRejectedExecutionHandler import ru.quipy.common.utils.NamedThreadFactory import ru.quipy.common.utils.SlidingWindowRateLimiter import ru.quipy.core.EventSourcingService -import ru.quipy.exceptions.TooManyRequestsException import ru.quipy.payments.api.PaymentAggregate -import ru.quipy.payments.dto.Transaction import java.time.Duration import java.util.* -import java.util.concurrent.* +import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.Semaphore +import java.util.concurrent.ThreadPoolExecutor +import java.util.concurrent.TimeUnit @Service class OrderPayer( @@ -43,7 +45,7 @@ class OrderPayer( TimeUnit.MILLISECONDS, ArrayBlockingQueue(accountProperties.parallelRequests), NamedThreadFactory("payment-submission-executor"), - ThreadPoolExecutor.AbortPolicy() + CallerBlockingRejectedExecutionHandler(maxWait = Duration.ofMillis(1500)) ) } @@ -55,40 +57,31 @@ class OrderPayer( } fun processPayment(orderId: UUID, amount: Int, paymentId: UUID, deadline: Long): Long { - val createdAt = System.currentTimeMillis() paymentProcessingPlannedCounter.increment() - parallelLimiter.acquire() while (!rateLimit.tick()) { - Thread.sleep(10) + Thread.sleep(Random().nextLong(1, 10)) } + val createdAt = System.currentTimeMillis() - val task = Runnable { - paymentProcessingStartedCounter.increment() + paymentProcessingStartedCounter.increment() + paymentExecutor.submit { try { val createdEvent = paymentESService.create { it.create(paymentId, orderId, amount) } - logger.trace("Payment {} for order {} created.", createdEvent.paymentId, orderId) - paymentService.submitPaymentRequest( - paymentId, - amount, - createdAt, - deadline - ) + logger.trace("Payment ${createdEvent.paymentId} for order $orderId created.") + paymentService.submitPaymentRequest(paymentId, amount, createdAt, deadline) + } catch (e: Exception) { + parallelLimiter.release() + paymentProcessingCompletedCounter.increment() + throw e } finally { parallelLimiter.release() paymentProcessingCompletedCounter.increment() - } - } - val transaction = Transaction(orderId, amount, paymentId, deadline, task) - - try { - paymentExecutor.execute(transaction) - return createdAt - } catch (_: RejectedExecutionException) { - throw TooManyRequestsException() + } } + return createdAt } } diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt index c0f4f84f5..5da9c9984 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt @@ -11,6 +11,7 @@ import ru.quipy.payments.api.PaymentAggregate import java.net.SocketTimeoutException import java.time.Duration import java.util.* +import kotlin.math.pow // Advice: always treat time as a Duration @@ -59,6 +60,9 @@ class PaymentExternalSystemAdapterImpl( post(emptyBody) }.build() + var isCompletedRequest = false + var retryCount = 0 + while (!isCompletedRequest && now() < deadline) { client.newCall(request).execute().use { response -> val body = try { mapper.readValue(response.body?.string(), ExternalSysResponse::class.java) @@ -67,6 +71,19 @@ class PaymentExternalSystemAdapterImpl( ExternalSysResponse(transactionId.toString(), paymentId.toString(), false, e.message) } + if (body.message?.contains("Temporary error") == true) { + if (retryCount < 7) { + retryCount++ + val backoffTime = (2.0.pow(retryCount.toDouble()) * 10 + Random().nextLong(0, 10)).toLong() + Thread.sleep(backoffTime) + continue + } else { + isCompletedRequest = true + } + } else { + isCompletedRequest = true + } + logger.warn("[$accountName] Payment processed for txId: $transactionId, payment: $paymentId, succeeded: ${body.result}, message: ${body.message}") // Здесь мы обновляем состояние оплаты в зависимости от результата в базе данных оплат. @@ -75,6 +92,7 @@ class PaymentExternalSystemAdapterImpl( it.logProcessing(body.result, now(), transactionId, reason = body.message) } } + } } catch (e: Exception) { when (e) { is SocketTimeoutException -> { @@ -83,7 +101,6 @@ class PaymentExternalSystemAdapterImpl( it.logProcessing(false, now(), transactionId, reason = "Request timeout.") } } - else -> { logger.error("[$accountName] Payment failed for txId: $transactionId, payment: $paymentId", e) diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index bb6025436..73440a6fd 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -23,6 +23,6 @@ payment.service-name=${PAYMENT_SERVICE_NAME} payment.token=${PAYMENT_TOKEN} # payment.accounts=${PAYMENT_ACCOUNTS:acc-12,acc-20} # payment.accounts=${PAYMENT_ACCOUNTS:acc-3} -payment.accounts=${PAYMENT_ACCOUNTS:acc-23} +payment.accounts=${PAYMENT_ACCOUNTS:acc-8} # payment.accounts=${PAYMENT_ACCOUNTS:acc-18} payment.hostPort=${PAYMENT_HOST:localhost}:${PAYMENT_PORT:1234} \ No newline at end of file