Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified .gitignore
Binary file not shown.
3 changes: 1 addition & 2 deletions src/main/kotlin/ru/quipy/apigateway/APIController.kt
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,13 @@ class APIController {
}

@PostMapping("/orders/{orderId}/payment")
fun payOrder(@PathVariable orderId: UUID, @RequestParam deadline: Long): PaymentSubmissionDto {
suspend fun payOrder(@PathVariable orderId: UUID, @RequestParam deadline: Long): PaymentSubmissionDto {
val paymentId = UUID.randomUUID()
val order = orderRepository.findById(orderId)?.let {
orderRepository.save(it.copy(status = OrderStatus.PAYMENT_IN_PROGRESS))
it
} ?: throw IllegalArgumentException("No such order $orderId")


val createdAt = orderPayer.processPayment(orderId, order.price, paymentId, deadline)
return PaymentSubmissionDto(createdAt, paymentId)
}
Expand Down
76 changes: 55 additions & 21 deletions src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
package ru.quipy.payments.logic

import kotlinx.coroutines.delay
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import ru.quipy.common.utils.CallerBlockingRejectedExecutionHandler
import ru.quipy.common.utils.NamedThreadFactory
import ru.quipy.common.utils.SlidingWindowRateLimiter
import ru.quipy.core.EventSourcingService
import ru.quipy.payments.api.PaymentAggregate
import java.time.Duration
import java.util.*
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.Semaphore
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit

Expand All @@ -26,30 +30,60 @@ class OrderPayer {
@Autowired
private lateinit var paymentService: PaymentService

private val paymentExecutor = ThreadPoolExecutor(
16,
16,
0L,
TimeUnit.MILLISECONDS,
LinkedBlockingQueue(8_000),
NamedThreadFactory("payment-submission-executor"),
CallerBlockingRejectedExecutionHandler()
)

fun processPayment(orderId: UUID, amount: Int, paymentId: UUID, deadline: Long): Long {
@Autowired
private lateinit var accountAdapters: List<PaymentExternalSystemAdapter>

private val accountProperties: PaymentAccountProperties by lazy {
accountAdapters.firstOrNull()?.getAccountProperties()
?: throw IllegalStateException("No payment accounts configured")
}

private val paymentExecutor: ThreadPoolExecutor by lazy {
ThreadPoolExecutor(
accountProperties.parallelRequests,
accountProperties.parallelRequests,
0L,
TimeUnit.MILLISECONDS,
LinkedBlockingQueue(accountProperties.parallelRequests * 10),
NamedThreadFactory("payment-submission-executor"),
CallerBlockingRejectedExecutionHandler()
)
}

private val rateLimit: SlidingWindowRateLimiter by lazy {
SlidingWindowRateLimiter(
rate = accountProperties.rateLimitPerSec.toLong(),
window = Duration.ofSeconds(1),
)
}

private val parallelLimiter = Semaphore(5)

suspend fun processPayment(orderId: UUID, amount: Int, paymentId: UUID, deadline: Long): Long {
val createdAt = System.currentTimeMillis()
paymentExecutor.submit {
val createdEvent = paymentESService.create {
it.create(
paymentId,
orderId,
amount
)

parallelLimiter.acquire()

return try {
while (!rateLimit.tick()) {
delay(100)
}
logger.trace("Payment ${createdEvent.paymentId} for order $orderId created.")

paymentService.submitPaymentRequest(paymentId, amount, createdAt, deadline)
paymentExecutor.submit {
try {
val createdEvent = paymentESService.create {
it.create(paymentId, orderId, amount)
}
logger.trace("Payment {} for order {} created.", createdEvent.paymentId, orderId)
paymentService.submitPaymentRequest(paymentId, amount, createdAt, deadline)
} finally {
parallelLimiter.release()
}
}
createdAt
} catch (e: Exception) {
parallelLimiter.release()
throw e
}
return createdAt
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ class PaymentExternalSystemAdapterImpl(

private val client = OkHttpClient.Builder().build()

override fun getAccountProperties(): PaymentAccountProperties {
return properties
}

override fun performPaymentAsync(paymentId: UUID, amount: Int, paymentStartedAt: Long, deadline: Long) {
logger.warn("[$accountName] Submitting payment request for payment $paymentId")

Expand Down
16 changes: 13 additions & 3 deletions src/main/kotlin/ru/quipy/payments/logic/PaymentService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ interface PaymentService {

*/
interface PaymentExternalSystemAdapter {
fun getAccountProperties(): PaymentAccountProperties

fun performPaymentAsync(paymentId: UUID, amount: Int, paymentStartedAt: Long, deadline: Long)

fun name(): String
Expand All @@ -32,13 +34,21 @@ interface PaymentExternalSystemAdapter {
data class PaymentAccountProperties(
val serviceName: String,
val accountName: String,
val parallelRequests: Int,
val rateLimitPerSec: Int,
val price: Int,
val parallelRequests: Int, // 30
val rateLimitPerSec: Int, // 10
val price: Int, // 30
val averageProcessingTime: Duration = Duration.ofSeconds(11),
val enabled: Boolean,
)

/*
#- parallelRequests=5 - означает, что провайдер разрешает вам в любой момент времени иметь не более 5 одновременных запросов от вас к нему для этого аккаунта
#- rateLimitPerSec=5 - означает, что провайдер разрешает вам каждую секунду отправлять к нему не более 5 запросов по этому аккаунту
#- price=30 - означает, что провайдер оплаты будет взымать за каждый успешный или неуспешный вызов 30 денежных единиц с вашего магазина.
#- averageProcessingTime=PTO.05S - провайдер оплаты сообщает вам, что в среднем время обработки одного запроса по этому аккаунту будет составлять около 50ms.

*/

/**
* Describes response from external service.
*/
Expand Down
4 changes: 3 additions & 1 deletion src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,7 @@ management.endpoints.web.exposure.include=info,health,prometheus,metrics

payment.service-name=${PAYMENT_SERVICE_NAME}
payment.token=${PAYMENT_TOKEN}
payment.accounts=${PAYMENT_ACCOUNTS:acc-12,acc-20}
# payment.accounts=${PAYMENT_ACCOUNTS:acc-12,acc-20}
# payment.accounts=${PAYMENT_ACCOUNTS:acc-3}
payment.accounts=${PAYMENT_ACCOUNTS:acc-5}
payment.hostPort=${PAYMENT_HOST:localhost}:${PAYMENT_PORT:1234}
26 changes: 23 additions & 3 deletions test-local-run.http
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,31 @@ Content-Type: application/json
{
"serviceName": "{{serviceName}}",
"token": "{{token}}",
"ratePerSecond": 1,
"ratePerSecond": 2,
"testCount": 100,
"processingTimeMillis": 80000
"processingTimeMillis": 60000
}

### Stop running test to save time and resources
# @timeout 120
POST http://localhost:1234/test/stop/{{serviceName}}
POST http://localhost:1234/test/stop/{{serviceName}}

# PaymentAccountProperties(
# serviceName=cas-m3404-05,
# accountName=acc-3,
# parallelRequests=30,
# rateLimitPerSec=10,
# price=30,
# averageProcessingTime=PT1S,
# enabled=true
# )

# PaymentAccountProperties(
# serviceName=cas-m3404-05,
# accountName=acc-5,
# parallelRequests=5,
# rateLimitPerSec=3,
# price=30,
# averageProcessingTime=PT4.9S,
# enabled=true
#)