Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion src/main/kotlin/ru/quipy/apigateway/APIController.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,26 @@ package ru.quipy.apigateway

import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.web.bind.annotation.*
import ru.quipy.common.utils.LeakingBucketRateLimiter
import ru.quipy.common.utils.RateLimiter
import ru.quipy.exceptions.TooManyRequestsException
import ru.quipy.orders.repository.OrderRepository
import ru.quipy.payments.logic.OrderPayer
import java.time.Duration
import java.util.*
import java.util.concurrent.Semaphore
import java.util.concurrent.TimeUnit

@RestController
class APIController(private val orderRepository: OrderRepository, private val orderPayer: OrderPayer) {
class APIController(
private val orderRepository: OrderRepository,
private val orderPayer: OrderPayer,
@field:Qualifier("parallelLimiter")
private val parallelLimiter: Semaphore,
private val rateLimiter: RateLimiter = LeakingBucketRateLimiter(10, Duration.ofSeconds(1), 20)
) {

val logger: Logger = LoggerFactory.getLogger(APIController::class.java)

Expand Down Expand Up @@ -49,6 +62,9 @@ class APIController(private val orderRepository: OrderRepository, private val or

@PostMapping("/orders/{orderId}/payment")
fun payOrder(@PathVariable orderId: UUID, @RequestParam deadline: Long): PaymentSubmissionDto {
if (!rateLimiter.tick() || !parallelLimiter.tryAcquire(1, TimeUnit.SECONDS)) {
throw TooManyRequestsException(deadline)
}
val paymentId = UUID.randomUUID()
val order = orderRepository.findById(orderId)?.let {
orderRepository.save(it.copy(status = OrderStatus.PAYMENT_IN_PROGRESS))
Expand Down
34 changes: 28 additions & 6 deletions src/main/kotlin/ru/quipy/apigateway/GlobalExceptionHandler.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,42 @@ import org.springframework.http.ResponseEntity
import org.springframework.web.bind.annotation.ExceptionHandler
import org.springframework.web.bind.annotation.RestControllerAdvice
import ru.quipy.exceptions.TooManyRequestsException
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicLong

@RestControllerAdvice
class GlobalExceptionHandler(
private val maxWait: String = "3",
) {
companion object {
val logger = LoggerFactory.getLogger(GlobalExceptionHandler::class.java)
private var currentRetryAfterSeconds = 1
private var exp_base = 2;
private const val MAX_RETRY_AFTER_MS = 256
private const val RESET_WINDOW_MS = 30000L
}

private val rejectedRequestsCount = AtomicInteger(0)
private val lastRejectionTime = AtomicLong(0)

@ExceptionHandler(TooManyRequestsException::class)
fun handleTooManyRequests(): ResponseEntity<String> {
return ResponseEntity
.status(HttpStatus.TOO_MANY_REQUESTS)
.header("Retry-After", maxWait)
.build()
fun handleTooManyRequests(exception: TooManyRequestsException): ResponseEntity<String> {
logger.warn("to many request")
val currentTime = System.currentTimeMillis()
val lastRejection = lastRejectionTime.get()

if (currentTime - lastRejection > RESET_WINDOW_MS) {
rejectedRequestsCount.set(0)
currentRetryAfterSeconds = 1
}
lastRejectionTime.set(currentTime)

if (rejectedRequestsCount.get() < 30 && exception.deadline < System.currentTimeMillis() + 1200) {
return ResponseEntity
.status(HttpStatus.TOO_MANY_REQUESTS)
.header("Retry-After", MAX_RETRY_AFTER_MS.coerceAtLeast(currentRetryAfterSeconds).toString())
.build()
}

return ResponseEntity.status(200).build()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class LeakingBucketRateLimiter(
private val releaseJob = rateLimiterScope.launch {
while (true) {
delay(window.toMillis())
for (i in 0..rate) {
for (i in 1..rate) {
queue.poll()
}
}
Expand Down
10 changes: 4 additions & 6 deletions src/main/kotlin/ru/quipy/config/RpcControlConfig.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,18 @@ package ru.quipy.config
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import ru.quipy.common.utils.CompositeRateLimiter
import ru.quipy.common.utils.RateLimiter
import ru.quipy.common.utils.SlidingWindowRateLimiter
import ru.quipy.common.utils.TokenBucketRateLimiter
import ru.quipy.payments.logic.PaymentAccountProperties
import java.time.Duration
import java.util.concurrent.Semaphore
import java.util.concurrent.TimeUnit

@Configuration
class RpcControlConfig {

@Bean
fun getSlidingWindowRateLimiter(accountProperties: PaymentAccountProperties) =
SlidingWindowRateLimiter(accountProperties.rateLimitPerSec.toLong(),
Duration.ofSeconds(1)
)

@Bean
@Qualifier("parallelLimiter")
fun parallelLimiter(accountProperties: PaymentAccountProperties): Semaphore {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
package ru.quipy.exceptions

class TooManyRequestsException() : RuntimeException()
class TooManyRequestsException(val deadline: Long) : RuntimeException()
64 changes: 36 additions & 28 deletions src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,17 @@ import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.stereotype.Service
import ru.quipy.common.utils.CallerBlockingRejectedExecutionHandler
import ru.quipy.common.utils.NamedThreadFactory
import ru.quipy.common.utils.SlidingWindowRateLimiter
import ru.quipy.core.EventSourcingService
import ru.quipy.exceptions.TooManyRequestsException
import ru.quipy.payments.api.PaymentAggregate
import ru.quipy.payments.dto.Transaction
import java.time.Duration
import java.util.*
import java.util.concurrent.*
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.Semaphore
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit

@Service
class OrderPayer(
Expand Down Expand Up @@ -43,51 +45,57 @@ class OrderPayer(
TimeUnit.MILLISECONDS,
ArrayBlockingQueue<Runnable>(accountProperties.parallelRequests),
NamedThreadFactory("payment-submission-executor"),
ThreadPoolExecutor.AbortPolicy()
CallerBlockingRejectedExecutionHandler(maxWait = Duration.ofMillis(1500))
)
}

private val rateLimit: SlidingWindowRateLimiter by lazy {
SlidingWindowRateLimiter(
rate = accountProperties.rateLimitPerSec.toLong(),
window = Duration.ofSeconds(1),
window = Duration.ofMillis(1000),
)
}

fun processPayment(orderId: UUID, amount: Int, paymentId: UUID, deadline: Long): Long {
val createdAt = System.currentTimeMillis()
paymentProcessingPlannedCounter.increment()
while (!rateLimit.tick()) {
Thread.sleep(Random().nextLong(1, 10))
}
val createdAt = System.currentTimeMillis()

val task = Runnable {
parallelLimiter.acquire()
while (!rateLimit.tick()) {
Thread.sleep(Random().nextInt(0, 10).toLong())
}
paymentProcessingStartedCounter.increment()
paymentProcessingStartedCounter.increment()
paymentExecutor.submit {
try {
val createdEvent = paymentESService.create {
it.create(paymentId, orderId, amount)
}
logger.trace("Payment {} for order {} created.", createdEvent.paymentId, orderId)
paymentService.submitPaymentRequest(
paymentId,
amount,
createdAt,
deadline
)
logger.trace("Payment ${createdEvent.paymentId} for order $orderId created.")
paymentService.submitPaymentRequest(paymentId, amount, createdAt, deadline)
} catch (e: Exception) {
parallelLimiter.release()
paymentProcessingCompletedCounter.increment()
throw e
} finally {
parallelLimiter.release()
paymentProcessingCompletedCounter.increment()

}
}
return createdAt
}
}

val transaction = Transaction(orderId, amount, paymentId, deadline, task)
/**
* "serviceName": "cas-m3404",
* "accountName": "acc-23",
* "parallelRequests": 64,
* "rateLimitPerSec": 11,
* "price": 30,
* "averageProcessingTime": "PT1S"

try {
paymentExecutor.execute(transaction)
return createdAt
} catch (_: RejectedExecutionException) {
throw TooManyRequestsException()
}
}
}
* "accounts": "acc-23",
* "ratePerSecond": 15,
* "testCount": 3000,
* "processingTimeMillis": 2500
* }
*/
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import ru.quipy.payments.api.PaymentAggregate
import java.net.SocketTimeoutException
import java.time.Duration
import java.util.*
import kotlin.math.pow


// Advice: always treat time as a Duration
Expand Down Expand Up @@ -59,6 +60,9 @@ class PaymentExternalSystemAdapterImpl(
post(emptyBody)
}.build()

var isCompletedRequest = false
var retryCount = 0
while (!isCompletedRequest && now() < deadline) {
client.newCall(request).execute().use { response ->
val body = try {
mapper.readValue(response.body?.string(), ExternalSysResponse::class.java)
Expand All @@ -67,6 +71,19 @@ class PaymentExternalSystemAdapterImpl(
ExternalSysResponse(transactionId.toString(), paymentId.toString(), false, e.message)
}

if (body.message?.contains("Temporary error") == true) {
if (retryCount < 7) {
retryCount++
val backoffTime = (2.0.pow(retryCount.toDouble()) * 10 + Random().nextLong(0, 10)).toLong()
Thread.sleep(backoffTime)
continue
} else {
isCompletedRequest = true
}
} else {
isCompletedRequest = true
}

logger.warn("[$accountName] Payment processed for txId: $transactionId, payment: $paymentId, succeeded: ${body.result}, message: ${body.message}")

// Здесь мы обновляем состояние оплаты в зависимости от результата в базе данных оплат.
Expand All @@ -75,6 +92,7 @@ class PaymentExternalSystemAdapterImpl(
it.logProcessing(body.result, now(), transactionId, reason = body.message)
}
}
}
} catch (e: Exception) {
when (e) {
is SocketTimeoutException -> {
Expand All @@ -83,7 +101,6 @@ class PaymentExternalSystemAdapterImpl(
it.logProcessing(false, now(), transactionId, reason = "Request timeout.")
}
}

else -> {
logger.error("[$accountName] Payment failed for txId: $transactionId, payment: $paymentId", e)

Expand Down
2 changes: 1 addition & 1 deletion src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ payment.service-name=${PAYMENT_SERVICE_NAME}
payment.token=${PAYMENT_TOKEN}
# payment.accounts=${PAYMENT_ACCOUNTS:acc-12,acc-20}
# payment.accounts=${PAYMENT_ACCOUNTS:acc-3}
payment.accounts=${PAYMENT_ACCOUNTS:acc-23}
payment.accounts=${PAYMENT_ACCOUNTS:acc-8}
# payment.accounts=${PAYMENT_ACCOUNTS:acc-18}
payment.hostPort=${PAYMENT_HOST:localhost}:${PAYMENT_PORT:1234}