diff --git a/src/main/kotlin/ru/quipy/apigateway/APIController.kt b/src/main/kotlin/ru/quipy/apigateway/APIController.kt index 6f23fa18d..2bdf631a6 100644 --- a/src/main/kotlin/ru/quipy/apigateway/APIController.kt +++ b/src/main/kotlin/ru/quipy/apigateway/APIController.kt @@ -55,14 +55,13 @@ class APIController { } @PostMapping("/orders/{orderId}/payment") - fun payOrder(@PathVariable orderId: UUID, @RequestParam deadline: Long): PaymentSubmissionDto { + suspend fun payOrder(@PathVariable orderId: UUID, @RequestParam deadline: Long): PaymentSubmissionDto { val paymentId = UUID.randomUUID() val order = orderRepository.findById(orderId)?.let { orderRepository.save(it.copy(status = OrderStatus.PAYMENT_IN_PROGRESS)) it } ?: throw IllegalArgumentException("No such order $orderId") - val createdAt = orderPayer.processPayment(orderId, order.price, paymentId, deadline) return PaymentSubmissionDto(createdAt, paymentId) } diff --git a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt index a5909b85b..4ccb7b018 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt @@ -1,13 +1,17 @@ package ru.quipy.payments.logic +import kotlinx.coroutines.delay +import kotlinx.coroutines.sync.Semaphore import org.slf4j.Logger import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service import ru.quipy.common.utils.CallerBlockingRejectedExecutionHandler import ru.quipy.common.utils.NamedThreadFactory +import ru.quipy.common.utils.SlidingWindowRateLimiter import ru.quipy.core.EventSourcingService import ru.quipy.payments.api.PaymentAggregate +import java.time.Duration import java.util.* import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.ThreadPoolExecutor @@ -27,29 +31,47 @@ class OrderPayer { private lateinit var paymentService: PaymentService private val paymentExecutor = ThreadPoolExecutor( - 16, - 16, + 30, + 30, 0L, TimeUnit.MILLISECONDS, - LinkedBlockingQueue(8_000), + LinkedBlockingQueue(150), NamedThreadFactory("payment-submission-executor"), CallerBlockingRejectedExecutionHandler() ) - fun processPayment(orderId: UUID, amount: Int, paymentId: UUID, deadline: Long): Long { + private val rateLimit = SlidingWindowRateLimiter( + rate = 10, + window = Duration.ofSeconds(1), + ) + + private val parallelLimiter = Semaphore(30) + + suspend fun processPayment(orderId: UUID, amount: Int, paymentId: UUID, deadline: Long): Long { val createdAt = System.currentTimeMillis() - paymentExecutor.submit { - val createdEvent = paymentESService.create { - it.create( - paymentId, - orderId, - amount - ) + + parallelLimiter.acquire() + + return try { + while(!rateLimit.tick()) { + delay(10) } - logger.trace("Payment ${createdEvent.paymentId} for order $orderId created.") - paymentService.submitPaymentRequest(paymentId, amount, createdAt, deadline) + paymentExecutor.submit { + try { + val createdEvent = paymentESService.create { + it.create(paymentId, orderId, amount) + } + logger.trace("Payment ${createdEvent.paymentId} for order $orderId created.") + paymentService.submitPaymentRequest(paymentId, amount, createdAt, deadline) + } finally { + parallelLimiter.release() + } + } + createdAt + } catch (e: Exception) { + parallelLimiter.release() + throw e } - return createdAt } } \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentService.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentService.kt index 255db77dd..3bf6cdbb5 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentService.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentService.kt @@ -32,13 +32,21 @@ interface PaymentExternalSystemAdapter { data class PaymentAccountProperties( val serviceName: String, val accountName: String, - val parallelRequests: Int, - val rateLimitPerSec: Int, - val price: Int, + val parallelRequests: Int, // 30 + val rateLimitPerSec: Int, // 10 + val price: Int, // 30 val averageProcessingTime: Duration = Duration.ofSeconds(11), val enabled: Boolean, ) +/* +#- parallelRequests=5 - означает, что провайдер разрешает вам в любой момент времени иметь не более 5 одновременных запросов от вас к нему для этого аккаунта +#- rateLimitPerSec=5 - означает, что провайдер разрешает вам каждую секунду отправлять к нему не более 5 запросов по этому аккаунту +#- price=30 - означает, что провайдер оплаты будет взымать за каждый успешный или неуспешный вызов 30 денежных единиц с вашего магазина. +#- averageProcessingTime=PTO.05S - провайдер оплаты сообщает вам, что в среднем время обработки одного запроса по этому аккаунту будет составлять около 50ms. + + */ + /** * Describes response from external service. */ diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 33d51a58b..661d0c947 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -26,5 +26,6 @@ management.endpoints.web.exposure.include=info,health,prometheus,metrics payment.service-name=${PAYMENT_SERVICE_NAME} payment.token=${PAYMENT_TOKEN} -payment.accounts=${PAYMENT_ACCOUNTS:acc-12,acc-20} +#payment.accounts=${PAYMENT_ACCOUNTS:acc-12,acc-20} +payment.accounts=${PAYMENT_ACCOUNTS:acc-3} payment.hostPort=${PAYMENT_HOST:localhost}:${PAYMENT_PORT:1234} \ No newline at end of file diff --git a/test-local-run.http b/test-local-run.http index 7be0e4f73..644edf067 100644 --- a/test-local-run.http +++ b/test-local-run.http @@ -5,8 +5,8 @@ Content-Type: application/json { "serviceName": "{{serviceName}}", "token": "{{token}}", - "ratePerSecond": 1, - "testCount": 100, + "ratePerSecond": 11, + "testCount": 1200, "processingTimeMillis": 80000 }