Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/main/kotlin/ru/quipy/apigateway/APIController.kt
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,13 @@ class APIController {
}

@PostMapping("/orders/{orderId}/payment")
fun payOrder(@PathVariable orderId: UUID, @RequestParam deadline: Long): PaymentSubmissionDto {
suspend fun payOrder(@PathVariable orderId: UUID, @RequestParam deadline: Long): PaymentSubmissionDto {
val paymentId = UUID.randomUUID()
val order = orderRepository.findById(orderId)?.let {
orderRepository.save(it.copy(status = OrderStatus.PAYMENT_IN_PROGRESS))
it
} ?: throw IllegalArgumentException("No such order $orderId")


val createdAt = orderPayer.processPayment(orderId, order.price, paymentId, deadline)
return PaymentSubmissionDto(createdAt, paymentId)
}
Expand Down
50 changes: 36 additions & 14 deletions src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package ru.quipy.payments.logic

import kotlinx.coroutines.delay
import kotlinx.coroutines.sync.Semaphore
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import ru.quipy.common.utils.CallerBlockingRejectedExecutionHandler
import ru.quipy.common.utils.NamedThreadFactory
import ru.quipy.common.utils.SlidingWindowRateLimiter
import ru.quipy.core.EventSourcingService
import ru.quipy.payments.api.PaymentAggregate
import java.time.Duration
import java.util.*
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.ThreadPoolExecutor
Expand All @@ -27,29 +31,47 @@ class OrderPayer {
private lateinit var paymentService: PaymentService

private val paymentExecutor = ThreadPoolExecutor(
16,
16,
30,
30,
0L,
TimeUnit.MILLISECONDS,
LinkedBlockingQueue(8_000),
LinkedBlockingQueue(150),
NamedThreadFactory("payment-submission-executor"),
CallerBlockingRejectedExecutionHandler()
)

fun processPayment(orderId: UUID, amount: Int, paymentId: UUID, deadline: Long): Long {
private val rateLimit = SlidingWindowRateLimiter(
rate = 10,
window = Duration.ofSeconds(1),
)

private val parallelLimiter = Semaphore(30)

suspend fun processPayment(orderId: UUID, amount: Int, paymentId: UUID, deadline: Long): Long {
val createdAt = System.currentTimeMillis()
paymentExecutor.submit {
val createdEvent = paymentESService.create {
it.create(
paymentId,
orderId,
amount
)

parallelLimiter.acquire()

return try {
while(!rateLimit.tick()) {
delay(10)
}
logger.trace("Payment ${createdEvent.paymentId} for order $orderId created.")

paymentService.submitPaymentRequest(paymentId, amount, createdAt, deadline)
paymentExecutor.submit {
try {
val createdEvent = paymentESService.create {
it.create(paymentId, orderId, amount)
}
logger.trace("Payment ${createdEvent.paymentId} for order $orderId created.")
paymentService.submitPaymentRequest(paymentId, amount, createdAt, deadline)
} finally {
parallelLimiter.release()
}
}
createdAt
} catch (e: Exception) {
parallelLimiter.release()
throw e
}
return createdAt
}
}
14 changes: 11 additions & 3 deletions src/main/kotlin/ru/quipy/payments/logic/PaymentService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,21 @@ interface PaymentExternalSystemAdapter {
data class PaymentAccountProperties(
val serviceName: String,
val accountName: String,
val parallelRequests: Int,
val rateLimitPerSec: Int,
val price: Int,
val parallelRequests: Int, // 30
val rateLimitPerSec: Int, // 10
val price: Int, // 30
val averageProcessingTime: Duration = Duration.ofSeconds(11),
val enabled: Boolean,
)

/*
#- parallelRequests=5 - означает, что провайдер разрешает вам в любой момент времени иметь не более 5 одновременных запросов от вас к нему для этого аккаунта
#- rateLimitPerSec=5 - означает, что провайдер разрешает вам каждую секунду отправлять к нему не более 5 запросов по этому аккаунту
#- price=30 - означает, что провайдер оплаты будет взымать за каждый успешный или неуспешный вызов 30 денежных единиц с вашего магазина.
#- averageProcessingTime=PTO.05S - провайдер оплаты сообщает вам, что в среднем время обработки одного запроса по этому аккаунту будет составлять около 50ms.

*/

/**
* Describes response from external service.
*/
Expand Down
3 changes: 2 additions & 1 deletion src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@ management.endpoints.web.exposure.include=info,health,prometheus,metrics

payment.service-name=${PAYMENT_SERVICE_NAME}
payment.token=${PAYMENT_TOKEN}
payment.accounts=${PAYMENT_ACCOUNTS:acc-12,acc-20}
#payment.accounts=${PAYMENT_ACCOUNTS:acc-12,acc-20}
payment.accounts=${PAYMENT_ACCOUNTS:acc-3}
payment.hostPort=${PAYMENT_HOST:localhost}:${PAYMENT_PORT:1234}
4 changes: 2 additions & 2 deletions test-local-run.http
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ Content-Type: application/json
{
"serviceName": "{{serviceName}}",
"token": "{{token}}",
"ratePerSecond": 1,
"testCount": 100,
"ratePerSecond": 11,
"testCount": 1200,
"processingTimeMillis": 80000
}

Expand Down