From 2c9db013ef56e4ba5f090b3eac6333dc304f0e81 Mon Sep 17 00:00:00 2001 From: kuro Date: Mon, 15 Sep 2025 16:35:56 +0300 Subject: [PATCH 1/5] Test 001: Create Leaking bucket. --- .../ru/quipy/payments/logic/OrderPayer.kt | 20 ++++++++++++++++--- src/main/resources/application.properties | 6 ++++-- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt index a5909b85b..fc373d967 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt @@ -5,11 +5,13 @@ import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service import ru.quipy.common.utils.CallerBlockingRejectedExecutionHandler +import ru.quipy.common.utils.LeakingBucketRateLimiter import ru.quipy.common.utils.NamedThreadFactory import ru.quipy.core.EventSourcingService import ru.quipy.payments.api.PaymentAggregate +import java.time.Duration import java.util.* -import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.ArrayBlockingQueue import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.TimeUnit @@ -28,16 +30,28 @@ class OrderPayer { private val paymentExecutor = ThreadPoolExecutor( 16, - 16, + 32, 0L, TimeUnit.MILLISECONDS, - LinkedBlockingQueue(8_000), + ArrayBlockingQueue(16_000), NamedThreadFactory("payment-submission-executor"), CallerBlockingRejectedExecutionHandler() ) + private val rateLimiter = LeakingBucketRateLimiter( + rate = 9, // Немного меньше текущего + window = Duration.ofSeconds(1), + bucketSize = 14 // Буфер для пиковых нагрузок + ) + fun processPayment(orderId: UUID, amount: Int, paymentId: UUID, deadline: Long): Long { val createdAt = System.currentTimeMillis() + + + while (!rateLimiter.tick()) { + Thread.sleep(1500) + } + paymentExecutor.submit { val createdEvent = paymentESService.create { it.create( diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 33d51a58b..b17f0ff9c 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -26,5 +26,7 @@ management.endpoints.web.exposure.include=info,health,prometheus,metrics payment.service-name=${PAYMENT_SERVICE_NAME} payment.token=${PAYMENT_TOKEN} -payment.accounts=${PAYMENT_ACCOUNTS:acc-12,acc-20} -payment.hostPort=${PAYMENT_HOST:localhost}:${PAYMENT_PORT:1234} \ No newline at end of file +# payment.accounts=${PAYMENT_ACCOUNTS:acc-12,acc-20} +payment.accounts=${PAYMENT_ACCOUNTS:acc-3} +payment.hostPort=${PAYMENT_HOST:localhost}:${PAYMENT_PORT:1234} + From 599a5f7f0fd4bae499212443e0ab0b81c7de6205 Mon Sep 17 00:00:00 2001 From: kuro Date: Mon, 15 Sep 2025 16:35:56 +0300 Subject: [PATCH 2/5] Test 001: Create Leaking bucket. --- .../ru/quipy/payments/logic/OrderPayer.kt | 20 ++++++++++++++++--- src/main/resources/application.properties | 16 +++++++++++++-- 2 files changed, 31 insertions(+), 5 deletions(-) diff --git a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt index a5909b85b..fc373d967 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt @@ -5,11 +5,13 @@ import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service import ru.quipy.common.utils.CallerBlockingRejectedExecutionHandler +import ru.quipy.common.utils.LeakingBucketRateLimiter import ru.quipy.common.utils.NamedThreadFactory import ru.quipy.core.EventSourcingService import ru.quipy.payments.api.PaymentAggregate +import java.time.Duration import java.util.* -import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.ArrayBlockingQueue import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.TimeUnit @@ -28,16 +30,28 @@ class OrderPayer { private val paymentExecutor = ThreadPoolExecutor( 16, - 16, + 32, 0L, TimeUnit.MILLISECONDS, - LinkedBlockingQueue(8_000), + ArrayBlockingQueue(16_000), NamedThreadFactory("payment-submission-executor"), CallerBlockingRejectedExecutionHandler() ) + private val rateLimiter = LeakingBucketRateLimiter( + rate = 9, // Немного меньше текущего + window = Duration.ofSeconds(1), + bucketSize = 14 // Буфер для пиковых нагрузок + ) + fun processPayment(orderId: UUID, amount: Int, paymentId: UUID, deadline: Long): Long { val createdAt = System.currentTimeMillis() + + + while (!rateLimiter.tick()) { + Thread.sleep(1500) + } + paymentExecutor.submit { val createdEvent = paymentESService.create { it.create( diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 33d51a58b..68d31bae5 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -26,5 +26,17 @@ management.endpoints.web.exposure.include=info,health,prometheus,metrics payment.service-name=${PAYMENT_SERVICE_NAME} payment.token=${PAYMENT_TOKEN} -payment.accounts=${PAYMENT_ACCOUNTS:acc-12,acc-20} -payment.hostPort=${PAYMENT_HOST:localhost}:${PAYMENT_PORT:1234} \ No newline at end of file +# payment.accounts=${PAYMENT_ACCOUNTS:acc-12,acc-20} +payment.accounts=${PAYMENT_ACCOUNTS:acc-3} +payment.hostPort=${PAYMENT_HOST:localhost}:${PAYMENT_PORT:1234} + +# ==================== ??????? ??????????? ==================== +logging.pattern.console=%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(%5p) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n%wEx +logging.level.ru.quipy=DEBUG +logging.level.org.springframework=INFO +logging.file.name=logs/application.log +logging.pattern.file=%d{yyyy-MM-dd HH:mm:ss.SSS} %5p ${PID:- } --- [%15.15t] %-40.40logger{39} : %m%n +spring.output.ansi.enabled=ALWAYS + +logging.level.com.zaxxer.hikari=WARN +logging.level.org.hibernate=INFO \ No newline at end of file From 3c9bcb26927aa74080574ba5fc6b82c1c2d13531 Mon Sep 17 00:00:00 2001 From: kuro Date: Mon, 15 Sep 2025 17:28:54 +0300 Subject: [PATCH 3/5] Test 001: Parameter update. --- .../kotlin/ru/quipy/payments/logic/OrderPayer.kt | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt index fc373d967..0c0704f3d 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt @@ -29,19 +29,19 @@ class OrderPayer { private lateinit var paymentService: PaymentService private val paymentExecutor = ThreadPoolExecutor( - 16, - 32, - 0L, - TimeUnit.MILLISECONDS, - ArrayBlockingQueue(16_000), + 12, + 24, + 60L, + TimeUnit.SECONDS, + ArrayBlockingQueue(1_000), NamedThreadFactory("payment-submission-executor"), CallerBlockingRejectedExecutionHandler() ) private val rateLimiter = LeakingBucketRateLimiter( - rate = 9, // Немного меньше текущего + rate = 11, // Немного меньше текущего window = Duration.ofSeconds(1), - bucketSize = 14 // Буфер для пиковых нагрузок + bucketSize = 22 // Буфер для пиковых нагрузок ) fun processPayment(orderId: UUID, amount: Int, paymentId: UUID, deadline: Long): Long { @@ -49,7 +49,7 @@ class OrderPayer { while (!rateLimiter.tick()) { - Thread.sleep(1500) + Thread.sleep(1200) } paymentExecutor.submit { From 7c4a1e1013bbd903bc1db722c18f0754f6bc2682 Mon Sep 17 00:00:00 2001 From: kuro Date: Mon, 15 Sep 2025 17:28:54 +0300 Subject: [PATCH 4/5] Test 001: Parameter update. --- src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt index fc373d967..0f37b1d7c 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt @@ -29,19 +29,19 @@ class OrderPayer { private lateinit var paymentService: PaymentService private val paymentExecutor = ThreadPoolExecutor( - 16, - 32, + 9, + 12, 0L, TimeUnit.MILLISECONDS, - ArrayBlockingQueue(16_000), + ArrayBlockingQueue(8_000), NamedThreadFactory("payment-submission-executor"), CallerBlockingRejectedExecutionHandler() ) private val rateLimiter = LeakingBucketRateLimiter( - rate = 9, // Немного меньше текущего + rate = 11, window = Duration.ofSeconds(1), - bucketSize = 14 // Буфер для пиковых нагрузок + bucketSize = 22 ) fun processPayment(orderId: UUID, amount: Int, paymentId: UUID, deadline: Long): Long { From f89595e279f97a5478f7a1f9e96751c2a9b0ac16 Mon Sep 17 00:00:00 2001 From: kuro Date: Fri, 19 Sep 2025 16:56:13 +0300 Subject: [PATCH 5/5] Test 001: version 2 --- .../ru/quipy/apigateway/APIController.kt | 26 ++++++++++++------- .../ru/quipy/config/RateLimiterConfig.kt | 19 ++++++++++++++ .../ru/quipy/payments/logic/OrderPayer.kt | 22 +++------------- test-local-run.http | 4 +-- test-on-prem-run.http | 2 +- 5 files changed, 42 insertions(+), 31 deletions(-) create mode 100644 src/main/kotlin/ru/quipy/config/RateLimiterConfig.kt diff --git a/src/main/kotlin/ru/quipy/apigateway/APIController.kt b/src/main/kotlin/ru/quipy/apigateway/APIController.kt index 6f23fa18d..c7727f43d 100644 --- a/src/main/kotlin/ru/quipy/apigateway/APIController.kt +++ b/src/main/kotlin/ru/quipy/apigateway/APIController.kt @@ -4,12 +4,13 @@ import org.slf4j.Logger import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Autowired import org.springframework.web.bind.annotation.* +import ru.quipy.common.utils.RateLimiter import ru.quipy.orders.repository.OrderRepository import ru.quipy.payments.logic.OrderPayer import java.util.* @RestController -class APIController { +class APIController(val rateLimiter: RateLimiter) { val logger: Logger = LoggerFactory.getLogger(APIController::class.java) @@ -56,19 +57,24 @@ class APIController { @PostMapping("/orders/{orderId}/payment") fun payOrder(@PathVariable orderId: UUID, @RequestParam deadline: Long): PaymentSubmissionDto { - val paymentId = UUID.randomUUID() - val order = orderRepository.findById(orderId)?.let { - orderRepository.save(it.copy(status = OrderStatus.PAYMENT_IN_PROGRESS)) - it - } ?: throw IllegalArgumentException("No such order $orderId") + if (!rateLimiter.tick()) { + throw TooManyRequestsException("Payment rate limit exceeded. Please try again later.") + } + val paymentId = UUID.randomUUID() + val order = orderRepository.findById(orderId)?.let { + orderRepository.save(it.copy(status = OrderStatus.PAYMENT_IN_PROGRESS)) + it + } ?: throw IllegalArgumentException("No such order $orderId") - val createdAt = orderPayer.processPayment(orderId, order.price, paymentId, deadline) - return PaymentSubmissionDto(createdAt, paymentId) - } + val createdAt = orderPayer.processPayment(orderId, order.price, paymentId, deadline) + return PaymentSubmissionDto(createdAt, paymentId) + } class PaymentSubmissionDto( val timestamp: Long, val transactionId: UUID ) -} \ No newline at end of file +} + +class TooManyRequestsException(message: String) : RuntimeException(message) \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/config/RateLimiterConfig.kt b/src/main/kotlin/ru/quipy/config/RateLimiterConfig.kt new file mode 100644 index 000000000..92a88eed8 --- /dev/null +++ b/src/main/kotlin/ru/quipy/config/RateLimiterConfig.kt @@ -0,0 +1,19 @@ +package ru.quipy.config + +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import ru.quipy.common.utils.LeakingBucketRateLimiter +import java.time.Duration + +@Configuration +class RateLimiterConfig { + + @Bean + fun paymentRateLimiter(): LeakingBucketRateLimiter { + return LeakingBucketRateLimiter( + rate = 9, + window = Duration.ofSeconds(1), + bucketSize = 11 + ) + } +} \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt index 0f37b1d7c..a5909b85b 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt @@ -5,13 +5,11 @@ import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service import ru.quipy.common.utils.CallerBlockingRejectedExecutionHandler -import ru.quipy.common.utils.LeakingBucketRateLimiter import ru.quipy.common.utils.NamedThreadFactory import ru.quipy.core.EventSourcingService import ru.quipy.payments.api.PaymentAggregate -import java.time.Duration import java.util.* -import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.TimeUnit @@ -29,29 +27,17 @@ class OrderPayer { private lateinit var paymentService: PaymentService private val paymentExecutor = ThreadPoolExecutor( - 9, - 12, + 16, + 16, 0L, TimeUnit.MILLISECONDS, - ArrayBlockingQueue(8_000), + LinkedBlockingQueue(8_000), NamedThreadFactory("payment-submission-executor"), CallerBlockingRejectedExecutionHandler() ) - private val rateLimiter = LeakingBucketRateLimiter( - rate = 11, - window = Duration.ofSeconds(1), - bucketSize = 22 - ) - fun processPayment(orderId: UUID, amount: Int, paymentId: UUID, deadline: Long): Long { val createdAt = System.currentTimeMillis() - - - while (!rateLimiter.tick()) { - Thread.sleep(1500) - } - paymentExecutor.submit { val createdEvent = paymentESService.create { it.create( diff --git a/test-local-run.http b/test-local-run.http index dc8dbeedf..9b76fed12 100644 --- a/test-local-run.http +++ b/test-local-run.http @@ -5,8 +5,8 @@ Content-Type: application/json { "serviceName": "{{serviceName}}", "token": "{{token}}", - "ratePerSecond": 1, - "testCount": 100, + "ratePerSecond": 11, + "testCount": 1200, "processingTimeMillis": 80000 } diff --git a/test-on-prem-run.http b/test-on-prem-run.http index 8233cbe0d..a7f7344db 100644 --- a/test-on-prem-run.http +++ b/test-on-prem-run.http @@ -6,7 +6,7 @@ Content-Type: application/json { "serviceName": "{{serviceName}}", "token": "{{token}}", - "branch": "main", + "branch": "feature/highload-leaky-bucket-case1", "accounts": "acc-3", "ratePerSecond": 11, "testCount": 1200,