From 4f8e2ff56da17c24890210b009a936909167fe1b Mon Sep 17 00:00:00 2001 From: 21092004Goda Date: Sat, 20 Dec 2025 00:10:08 +0300 Subject: [PATCH] =?UTF-8?q?=D0=91=D0=B0=D0=BD=20=D0=B8=20=D0=B2=D1=81?= =?UTF-8?q?=D0=B5.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # Conflicts: # src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt --- .../ru/quipy/apigateway/APIController.kt | 3 +- .../payments/config/PaymentAccountsConfig.kt | 89 ++++++++++++++++--- .../ru/quipy/payments/logic/OrderPayer.kt | 25 ++---- src/main/resources/application.properties | 7 +- test-local-run.http | 36 ++++++-- 5 files changed, 124 insertions(+), 36 deletions(-) diff --git a/src/main/kotlin/ru/quipy/apigateway/APIController.kt b/src/main/kotlin/ru/quipy/apigateway/APIController.kt index c07d96b7a..8f81acc8d 100644 --- a/src/main/kotlin/ru/quipy/apigateway/APIController.kt +++ b/src/main/kotlin/ru/quipy/apigateway/APIController.kt @@ -16,8 +16,7 @@ import java.util.* class APIController( private val orderRepository: OrderRepository, private val orderPayer: OrderPayer, - @field:Qualifier("parallelLimiter") - private val rateLimiter: RateLimiter = LeakingBucketRateLimiter(8, Duration.ofSeconds(1), 30) + private val rateLimiter: LeakingBucketRateLimiter ) { val logger: Logger = LoggerFactory.getLogger(APIController::class.java) diff --git a/src/main/kotlin/ru/quipy/payments/config/PaymentAccountsConfig.kt b/src/main/kotlin/ru/quipy/payments/config/PaymentAccountsConfig.kt index e10bd805a..2e2014c5c 100644 --- a/src/main/kotlin/ru/quipy/payments/config/PaymentAccountsConfig.kt +++ b/src/main/kotlin/ru/quipy/payments/config/PaymentAccountsConfig.kt @@ -7,6 +7,10 @@ import io.micrometer.core.instrument.MeterRegistry import org.springframework.beans.factory.annotation.Value import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration +import ru.quipy.common.utils.CallerBlockingRejectedExecutionHandler +import ru.quipy.common.utils.LeakingBucketRateLimiter +import ru.quipy.common.utils.NamedThreadFactory +import ru.quipy.common.utils.SlidingWindowRateLimiter import ru.quipy.core.EventSourcingService import ru.quipy.payments.api.PaymentAggregate import ru.quipy.payments.logic.PaymentAccountProperties @@ -17,8 +21,12 @@ import java.net.URI import java.net.http.HttpClient import java.net.http.HttpRequest import java.net.http.HttpResponse +import java.time.Duration import java.util.* +import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.Semaphore +import java.util.concurrent.ThreadPoolExecutor +import java.util.concurrent.TimeUnit @Configuration @@ -28,6 +36,8 @@ class PaymentAccountsConfig { private val mapper = ObjectMapper().registerKotlinModule().registerModules(JavaTimeModule()) } + var rateCheckWindow: Duration = Duration.ofMillis(1000) + @Value("\${payment.hostPort}") lateinit var paymentProviderHostPort: String @@ -41,23 +51,82 @@ class PaymentAccountsConfig { lateinit var allowedAccounts: List @Bean - fun accountAdapters( - paymentService: EventSourcingService, - meterRegistry: MeterRegistry, - ): List { + fun warehouseIfUnfinishedWork( + accountProperties: List, + @Value("\${payment.maximumPoolSize}") + maximumPoolSize: Int + ): ThreadPoolExecutor { + val poolSize = 100 + val temp = ThreadPoolExecutor( + poolSize, + poolSize, + 0, + TimeUnit.MILLISECONDS, + LinkedBlockingQueue(maximumPoolSize), + NamedThreadFactory("payment-submission-executor"), + CallerBlockingRejectedExecutionHandler() + ) + return temp + } + + @Bean + fun parallelLimiter( + accountProperties: List + ): Semaphore = + Semaphore(accountProperties.minOf { it.parallelRequests }) + + @Bean + fun burstRateLimiter( + accountProperties: List, + @Value("#{'\${payment.processingTimeMillis}'.split(',')}") + processingTimeMillis: Int + ): LeakingBucketRateLimiter = + LeakingBucketRateLimiter( + rate = accountProperties.minOf { it.rateLimitPerSec }.toLong(), + window = rateCheckWindow, + bucketSize = (((processingTimeMillis - accountProperties.maxOf { it.averageProcessingTime } + .toMillis()) / accountProperties.maxOf { it.averageProcessingTime } + .toMillis()) * accountProperties.minOf { it.rateLimitPerSec }.toLong()).toInt() + ) + + @Bean + fun smoothOutIncoming( + accountProperties: List, + ): SlidingWindowRateLimiter = + SlidingWindowRateLimiter( + rate = accountProperties.minOf { it.rateLimitPerSec }.toLong(), + window = rateCheckWindow, + ) + + + @Bean + fun accountProperties(): List { val request = HttpRequest.newBuilder() .uri(URI("http://${paymentProviderHostPort}/external/accounts?serviceName=$serviceName&token=$token")) + .timeout(Duration.ofSeconds(10)) .GET() .build() val resp = javaClient.send(request, HttpResponse.BodyHandlers.ofString()) - println("\nPayment accounts list:") - return mapper.readValue>( + val accounts: List = mapper.readValue>( resp.body(), - mapper.typeFactory.constructCollectionType(List::class.java, PaymentAccountProperties::class.java) - ) - .filter { it.accountName in allowedAccounts } + mapper.typeFactory.constructCollectionType( + List::class.java, + PaymentAccountProperties::class.java + ) + ).filter { it.accountName in allowedAccounts } + + return accounts + } + + @Bean + fun accountAdapters( + paymentService: EventSourcingService, + meterRegistry: MeterRegistry, + accountProperties: List + ): List { + return accountProperties .map { it.copy(enabled = true) } .onEach(::println) .map { @@ -67,7 +136,7 @@ class PaymentAccountsConfig { paymentProviderHostPort, token, meterRegistry, - Semaphore(it.parallelRequests) + parallelLimiter(accountProperties) ) } } diff --git a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt index 359aa0a59..7c943d0ca 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt @@ -3,6 +3,7 @@ package ru.quipy.payments.logic import org.slf4j.Logger import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Autowired +import org.springframework.beans.factory.annotation.Qualifier import org.springframework.stereotype.Service import ru.quipy.common.utils.CallerBlockingRejectedExecutionHandler import ru.quipy.common.utils.NamedThreadFactory @@ -18,17 +19,15 @@ import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.TimeUnit @Service -class OrderPayer { +class OrderPayer( + val rateLimiter: SlidingWindowRateLimiter, + @Qualifier("warehouseIfUnfinishedWork") + val paymentExecutor: ThreadPoolExecutor +) { companion object { val logger: Logger = LoggerFactory.getLogger(OrderPayer::class.java) } - private val rateLimit: SlidingWindowRateLimiter by lazy { - SlidingWindowRateLimiter( - rate = 8, - window = Duration.ofMillis(1000), - ) - } @Autowired private lateinit var paymentESService: EventSourcingService @@ -36,19 +35,9 @@ class OrderPayer { @Autowired private lateinit var paymentService: PaymentService - private val paymentExecutor = ThreadPoolExecutor( - 16, - 16, - 0, - TimeUnit.MILLISECONDS, - LinkedBlockingQueue(8_000), - NamedThreadFactory("payment-submission-executor"), - CallerBlockingRejectedExecutionHandler() - ) - fun processPayment(orderId: UUID, amount: Int, paymentId: UUID, deadline: Long): Long { val createdAt = System.currentTimeMillis() - if (!rateLimit.tickBlocking(deadline- System.currentTimeMillis())) { + if (!rateLimiter.tickBlocking(deadline- System.currentTimeMillis())) { throw TooManyRequestsException(deadline) } paymentExecutor.submit { diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 3170b6dad..0b511f2c3 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -26,6 +26,11 @@ payment.service-name=${PAYMENT_SERVICE_NAME} payment.token=${PAYMENT_TOKEN} # payment.accounts=${PAYMENT_ACCOUNTS:acc-12,acc-20} # payment.accounts=${PAYMENT_ACCOUNTS:acc-3} -payment.accounts=${PAYMENT_ACCOUNTS:acc-7} +#payment.accounts=${PAYMENT_ACCOUNTS:acc-7} # payment.accounts=${PAYMENT_ACCOUNTS:acc-18} payment.hostPort=${PAYMENT_HOST:localhost}:${PAYMENT_PORT:1234} + +# For 8 case +payment.accounts=${PAYMENT_ACCOUNTS:acc-9} +payment.processingTimeMillis=20000 +payment.maximumPoolSize=8000 diff --git a/test-local-run.http b/test-local-run.http index dc8dbeedf..9cde39e20 100644 --- a/test-local-run.http +++ b/test-local-run.http @@ -1,15 +1,41 @@ -### Run test locally +### Test 6 run... POST http://localhost:1234/test/run Content-Type: application/json { "serviceName": "{{serviceName}}", "token": "{{token}}", - "ratePerSecond": 1, - "testCount": 100, - "processingTimeMillis": 80000 + "ratePerSecond": 100, + "testCount": 5000, + "processingTimeMillis": 20000 } +### Test 7 run... +POST http://localhost:1234/test/run +Content-Type: application/json + +{ + "serviceName": "{{serviceName}}", + "token": "{{token}}", + "ratePerSecond": 100, + "testCount": 5000, + "processingTimeMillis": 20000 +} + +### Test 8 run... +POST http://localhost:1234/test/run +Content-Type: application/json + +{ + "serviceName": "{{serviceName}}", + "token": "{{token}}", + "ratePerSecond": 100, + "testCount": 5000, + "processingTimeMillis": 20000 +} + + + ### Stop running test to save time and resources # @timeout 120 -POST http://localhost:4321/test/stop/"{{serviceName}}" \ No newline at end of file +POST http://localhost:4321/test/stop/"{{serviceName}}"