diff --git a/.gitignore b/.gitignore index 259113f73..1c76414f1 100644 Binary files a/.gitignore and b/.gitignore differ diff --git a/src/main/kotlin/ru/quipy/apigateway/APIController.kt b/src/main/kotlin/ru/quipy/apigateway/APIController.kt index 6f23fa18d..2bdf631a6 100644 --- a/src/main/kotlin/ru/quipy/apigateway/APIController.kt +++ b/src/main/kotlin/ru/quipy/apigateway/APIController.kt @@ -55,14 +55,13 @@ class APIController { } @PostMapping("/orders/{orderId}/payment") - fun payOrder(@PathVariable orderId: UUID, @RequestParam deadline: Long): PaymentSubmissionDto { + suspend fun payOrder(@PathVariable orderId: UUID, @RequestParam deadline: Long): PaymentSubmissionDto { val paymentId = UUID.randomUUID() val order = orderRepository.findById(orderId)?.let { orderRepository.save(it.copy(status = OrderStatus.PAYMENT_IN_PROGRESS)) it } ?: throw IllegalArgumentException("No such order $orderId") - val createdAt = orderPayer.processPayment(orderId, order.price, paymentId, deadline) return PaymentSubmissionDto(createdAt, paymentId) } diff --git a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt index a5909b85b..efac11f4d 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt @@ -1,15 +1,19 @@ package ru.quipy.payments.logic +import kotlinx.coroutines.delay import org.slf4j.Logger import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service import ru.quipy.common.utils.CallerBlockingRejectedExecutionHandler import ru.quipy.common.utils.NamedThreadFactory +import ru.quipy.common.utils.SlidingWindowRateLimiter import ru.quipy.core.EventSourcingService import ru.quipy.payments.api.PaymentAggregate +import java.time.Duration import java.util.* import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.Semaphore import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.TimeUnit @@ -26,30 +30,60 @@ class OrderPayer { @Autowired private lateinit var paymentService: PaymentService - private val paymentExecutor = ThreadPoolExecutor( - 16, - 16, - 0L, - TimeUnit.MILLISECONDS, - LinkedBlockingQueue(8_000), - NamedThreadFactory("payment-submission-executor"), - CallerBlockingRejectedExecutionHandler() - ) - - fun processPayment(orderId: UUID, amount: Int, paymentId: UUID, deadline: Long): Long { + @Autowired + private lateinit var accountAdapters: List + + private val accountProperties: PaymentAccountProperties by lazy { + accountAdapters.firstOrNull()?.getAccountProperties() + ?: throw IllegalStateException("No payment accounts configured") + } + + private val paymentExecutor: ThreadPoolExecutor by lazy { + ThreadPoolExecutor( + accountProperties.parallelRequests, + accountProperties.parallelRequests, + 0L, + TimeUnit.MILLISECONDS, + LinkedBlockingQueue(accountProperties.parallelRequests * 10), + NamedThreadFactory("payment-submission-executor"), + CallerBlockingRejectedExecutionHandler() + ) + } + + private val rateLimit: SlidingWindowRateLimiter by lazy { + SlidingWindowRateLimiter( + rate = accountProperties.rateLimitPerSec.toLong(), + window = Duration.ofSeconds(1), + ) + } + + private val parallelLimiter = Semaphore(5) + + suspend fun processPayment(orderId: UUID, amount: Int, paymentId: UUID, deadline: Long): Long { val createdAt = System.currentTimeMillis() - paymentExecutor.submit { - val createdEvent = paymentESService.create { - it.create( - paymentId, - orderId, - amount - ) + + parallelLimiter.acquire() + + return try { + while (!rateLimit.tick()) { + delay(100) } - logger.trace("Payment ${createdEvent.paymentId} for order $orderId created.") - paymentService.submitPaymentRequest(paymentId, amount, createdAt, deadline) + paymentExecutor.submit { + try { + val createdEvent = paymentESService.create { + it.create(paymentId, orderId, amount) + } + logger.trace("Payment {} for order {} created.", createdEvent.paymentId, orderId) + paymentService.submitPaymentRequest(paymentId, amount, createdAt, deadline) + } finally { + parallelLimiter.release() + } + } + createdAt + } catch (e: Exception) { + parallelLimiter.release() + throw e } - return createdAt } } \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt index 5cb12106a..905b3088b 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt @@ -36,6 +36,10 @@ class PaymentExternalSystemAdapterImpl( private val client = OkHttpClient.Builder().build() + override fun getAccountProperties(): PaymentAccountProperties { + return properties + } + override fun performPaymentAsync(paymentId: UUID, amount: Int, paymentStartedAt: Long, deadline: Long) { logger.warn("[$accountName] Submitting payment request for payment $paymentId") diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentService.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentService.kt index 255db77dd..2907c7706 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentService.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentService.kt @@ -17,6 +17,8 @@ interface PaymentService { */ interface PaymentExternalSystemAdapter { + fun getAccountProperties(): PaymentAccountProperties + fun performPaymentAsync(paymentId: UUID, amount: Int, paymentStartedAt: Long, deadline: Long) fun name(): String @@ -32,13 +34,21 @@ interface PaymentExternalSystemAdapter { data class PaymentAccountProperties( val serviceName: String, val accountName: String, - val parallelRequests: Int, - val rateLimitPerSec: Int, - val price: Int, + val parallelRequests: Int, // 30 + val rateLimitPerSec: Int, // 10 + val price: Int, // 30 val averageProcessingTime: Duration = Duration.ofSeconds(11), val enabled: Boolean, ) +/* +#- parallelRequests=5 - означает, что провайдер разрешает вам в любой момент времени иметь не более 5 одновременных запросов от вас к нему для этого аккаунта +#- rateLimitPerSec=5 - означает, что провайдер разрешает вам каждую секунду отправлять к нему не более 5 запросов по этому аккаунту +#- price=30 - означает, что провайдер оплаты будет взымать за каждый успешный или неуспешный вызов 30 денежных единиц с вашего магазина. +#- averageProcessingTime=PTO.05S - провайдер оплаты сообщает вам, что в среднем время обработки одного запроса по этому аккаунту будет составлять около 50ms. + + */ + /** * Describes response from external service. */ diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 33d51a58b..4e3ebfbc4 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -26,5 +26,7 @@ management.endpoints.web.exposure.include=info,health,prometheus,metrics payment.service-name=${PAYMENT_SERVICE_NAME} payment.token=${PAYMENT_TOKEN} -payment.accounts=${PAYMENT_ACCOUNTS:acc-12,acc-20} +# payment.accounts=${PAYMENT_ACCOUNTS:acc-12,acc-20} +# payment.accounts=${PAYMENT_ACCOUNTS:acc-3} +payment.accounts=${PAYMENT_ACCOUNTS:acc-5} payment.hostPort=${PAYMENT_HOST:localhost}:${PAYMENT_PORT:1234} \ No newline at end of file diff --git a/test-local-run.http b/test-local-run.http index 7be0e4f73..cabd61099 100644 --- a/test-local-run.http +++ b/test-local-run.http @@ -5,11 +5,31 @@ Content-Type: application/json { "serviceName": "{{serviceName}}", "token": "{{token}}", - "ratePerSecond": 1, + "ratePerSecond": 2, "testCount": 100, - "processingTimeMillis": 80000 + "processingTimeMillis": 60000 } ### Stop running test to save time and resources # @timeout 120 -POST http://localhost:1234/test/stop/{{serviceName}} \ No newline at end of file +POST http://localhost:1234/test/stop/{{serviceName}} + +# PaymentAccountProperties( +# serviceName=cas-m3404-05, +# accountName=acc-3, +# parallelRequests=30, +# rateLimitPerSec=10, +# price=30, +# averageProcessingTime=PT1S, +# enabled=true +# ) + +# PaymentAccountProperties( +# serviceName=cas-m3404-05, +# accountName=acc-5, +# parallelRequests=5, +# rateLimitPerSec=3, +# price=30, +# averageProcessingTime=PT4.9S, +# enabled=true +#)