diff --git a/src/main/kotlin/ru/quipy/apigateway/APIController.kt b/src/main/kotlin/ru/quipy/apigateway/APIController.kt index 251b8cdbc..1a92d7863 100644 --- a/src/main/kotlin/ru/quipy/apigateway/APIController.kt +++ b/src/main/kotlin/ru/quipy/apigateway/APIController.kt @@ -2,13 +2,26 @@ package ru.quipy.apigateway import org.slf4j.Logger import org.slf4j.LoggerFactory +import org.springframework.beans.factory.annotation.Qualifier import org.springframework.web.bind.annotation.* +import ru.quipy.common.utils.LeakingBucketRateLimiter +import ru.quipy.common.utils.RateLimiter +import ru.quipy.exceptions.TooManyRequestsException import ru.quipy.orders.repository.OrderRepository import ru.quipy.payments.logic.OrderPayer +import java.time.Duration import java.util.* +import java.util.concurrent.Semaphore +import java.util.concurrent.TimeUnit @RestController -class APIController(private val orderRepository: OrderRepository, private val orderPayer: OrderPayer) { +class APIController( + private val orderRepository: OrderRepository, + private val orderPayer: OrderPayer, + @field:Qualifier("parallelLimiter") + private val parallelLimiter: Semaphore, + private val rateLimiter: RateLimiter = LeakingBucketRateLimiter(10, Duration.ofSeconds(1), 20) +) { val logger: Logger = LoggerFactory.getLogger(APIController::class.java) @@ -49,6 +62,9 @@ class APIController(private val orderRepository: OrderRepository, private val or @PostMapping("/orders/{orderId}/payment") fun payOrder(@PathVariable orderId: UUID, @RequestParam deadline: Long): PaymentSubmissionDto { + if (!rateLimiter.tick() || !parallelLimiter.tryAcquire(1, TimeUnit.SECONDS)) { + throw TooManyRequestsException(deadline) + } val paymentId = UUID.randomUUID() val order = orderRepository.findById(orderId)?.let { orderRepository.save(it.copy(status = OrderStatus.PAYMENT_IN_PROGRESS)) diff --git a/src/main/kotlin/ru/quipy/apigateway/GlobalExceptionHandler.kt b/src/main/kotlin/ru/quipy/apigateway/GlobalExceptionHandler.kt index e32911ade..1fede48bf 100644 --- a/src/main/kotlin/ru/quipy/apigateway/GlobalExceptionHandler.kt +++ b/src/main/kotlin/ru/quipy/apigateway/GlobalExceptionHandler.kt @@ -6,20 +6,42 @@ import org.springframework.http.ResponseEntity import org.springframework.web.bind.annotation.ExceptionHandler import org.springframework.web.bind.annotation.RestControllerAdvice import ru.quipy.exceptions.TooManyRequestsException +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicLong @RestControllerAdvice class GlobalExceptionHandler( - private val maxWait: String = "3", ) { companion object { val logger = LoggerFactory.getLogger(GlobalExceptionHandler::class.java) + private var currentRetryAfterSeconds = 1 + private var exp_base = 2; + private const val MAX_RETRY_AFTER_MS = 256 + private const val RESET_WINDOW_MS = 30000L } + private val rejectedRequestsCount = AtomicInteger(0) + private val lastRejectionTime = AtomicLong(0) + @ExceptionHandler(TooManyRequestsException::class) - fun handleTooManyRequests(): ResponseEntity { - return ResponseEntity - .status(HttpStatus.TOO_MANY_REQUESTS) - .header("Retry-After", maxWait) - .build() + fun handleTooManyRequests(exception: TooManyRequestsException): ResponseEntity { + logger.warn("to many request") + val currentTime = System.currentTimeMillis() + val lastRejection = lastRejectionTime.get() + + if (currentTime - lastRejection > RESET_WINDOW_MS) { + rejectedRequestsCount.set(0) + currentRetryAfterSeconds = 1 + } + lastRejectionTime.set(currentTime) + + if (rejectedRequestsCount.get() < 30 && exception.deadline < System.currentTimeMillis() + 1200) { + return ResponseEntity + .status(HttpStatus.TOO_MANY_REQUESTS) + .header("Retry-After", MAX_RETRY_AFTER_MS.coerceAtLeast(currentRetryAfterSeconds).toString()) + .build() + } + + return ResponseEntity.status(200).build() } } \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/common/utils/LeakingBucketRateLimiter.kt b/src/main/kotlin/ru/quipy/common/utils/LeakingBucketRateLimiter.kt index 383c047f7..88db1eb39 100644 --- a/src/main/kotlin/ru/quipy/common/utils/LeakingBucketRateLimiter.kt +++ b/src/main/kotlin/ru/quipy/common/utils/LeakingBucketRateLimiter.kt @@ -25,7 +25,7 @@ class LeakingBucketRateLimiter( private val releaseJob = rateLimiterScope.launch { while (true) { delay(window.toMillis()) - for (i in 0..rate) { + for (i in 1..rate) { queue.poll() } } diff --git a/src/main/kotlin/ru/quipy/config/RpcControlConfig.kt b/src/main/kotlin/ru/quipy/config/RpcControlConfig.kt index bd37c79e2..ec2747436 100644 --- a/src/main/kotlin/ru/quipy/config/RpcControlConfig.kt +++ b/src/main/kotlin/ru/quipy/config/RpcControlConfig.kt @@ -3,20 +3,18 @@ package ru.quipy.config import org.springframework.beans.factory.annotation.Qualifier import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration +import ru.quipy.common.utils.CompositeRateLimiter +import ru.quipy.common.utils.RateLimiter import ru.quipy.common.utils.SlidingWindowRateLimiter +import ru.quipy.common.utils.TokenBucketRateLimiter import ru.quipy.payments.logic.PaymentAccountProperties import java.time.Duration import java.util.concurrent.Semaphore +import java.util.concurrent.TimeUnit @Configuration class RpcControlConfig { - @Bean - fun getSlidingWindowRateLimiter(accountProperties: PaymentAccountProperties) = - SlidingWindowRateLimiter(accountProperties.rateLimitPerSec.toLong(), - Duration.ofSeconds(1) - ) - @Bean @Qualifier("parallelLimiter") fun parallelLimiter(accountProperties: PaymentAccountProperties): Semaphore { diff --git a/src/main/kotlin/ru/quipy/exceptions/TooManyRequestsException.kt b/src/main/kotlin/ru/quipy/exceptions/TooManyRequestsException.kt index 8d887d9f7..cacd67810 100644 --- a/src/main/kotlin/ru/quipy/exceptions/TooManyRequestsException.kt +++ b/src/main/kotlin/ru/quipy/exceptions/TooManyRequestsException.kt @@ -1,3 +1,3 @@ package ru.quipy.exceptions -class TooManyRequestsException() : RuntimeException() \ No newline at end of file +class TooManyRequestsException(val deadline: Long) : RuntimeException() \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt index 2e8320429..fe035cd6a 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt @@ -6,15 +6,17 @@ import org.slf4j.Logger import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Qualifier import org.springframework.stereotype.Service +import ru.quipy.common.utils.CallerBlockingRejectedExecutionHandler import ru.quipy.common.utils.NamedThreadFactory import ru.quipy.common.utils.SlidingWindowRateLimiter import ru.quipy.core.EventSourcingService -import ru.quipy.exceptions.TooManyRequestsException import ru.quipy.payments.api.PaymentAggregate -import ru.quipy.payments.dto.Transaction import java.time.Duration import java.util.* -import java.util.concurrent.* +import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.Semaphore +import java.util.concurrent.ThreadPoolExecutor +import java.util.concurrent.TimeUnit @Service class OrderPayer( @@ -43,51 +45,57 @@ class OrderPayer( TimeUnit.MILLISECONDS, ArrayBlockingQueue(accountProperties.parallelRequests), NamedThreadFactory("payment-submission-executor"), - ThreadPoolExecutor.AbortPolicy() + CallerBlockingRejectedExecutionHandler(maxWait = Duration.ofMillis(1500)) ) } private val rateLimit: SlidingWindowRateLimiter by lazy { SlidingWindowRateLimiter( rate = accountProperties.rateLimitPerSec.toLong(), - window = Duration.ofSeconds(1), + window = Duration.ofMillis(1000), ) } fun processPayment(orderId: UUID, amount: Int, paymentId: UUID, deadline: Long): Long { - val createdAt = System.currentTimeMillis() paymentProcessingPlannedCounter.increment() + while (!rateLimit.tick()) { + Thread.sleep(Random().nextLong(1, 10)) + } + val createdAt = System.currentTimeMillis() - val task = Runnable { - parallelLimiter.acquire() - while (!rateLimit.tick()) { - Thread.sleep(Random().nextInt(0, 10).toLong()) - } - paymentProcessingStartedCounter.increment() + paymentProcessingStartedCounter.increment() + paymentExecutor.submit { try { val createdEvent = paymentESService.create { it.create(paymentId, orderId, amount) } - logger.trace("Payment {} for order {} created.", createdEvent.paymentId, orderId) - paymentService.submitPaymentRequest( - paymentId, - amount, - createdAt, - deadline - ) + logger.trace("Payment ${createdEvent.paymentId} for order $orderId created.") + paymentService.submitPaymentRequest(paymentId, amount, createdAt, deadline) + } catch (e: Exception) { + parallelLimiter.release() + paymentProcessingCompletedCounter.increment() + throw e } finally { parallelLimiter.release() paymentProcessingCompletedCounter.increment() + } } + return createdAt + } +} - val transaction = Transaction(orderId, amount, paymentId, deadline, task) +/** + * "serviceName": "cas-m3404", + * "accountName": "acc-23", + * "parallelRequests": 64, + * "rateLimitPerSec": 11, + * "price": 30, + * "averageProcessingTime": "PT1S" - try { - paymentExecutor.execute(transaction) - return createdAt - } catch (_: RejectedExecutionException) { - throw TooManyRequestsException() - } - } -} \ No newline at end of file + * "accounts": "acc-23", + * "ratePerSecond": 15, + * "testCount": 3000, + * "processingTimeMillis": 2500 + * } + */ \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt index c0f4f84f5..5da9c9984 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt @@ -11,6 +11,7 @@ import ru.quipy.payments.api.PaymentAggregate import java.net.SocketTimeoutException import java.time.Duration import java.util.* +import kotlin.math.pow // Advice: always treat time as a Duration @@ -59,6 +60,9 @@ class PaymentExternalSystemAdapterImpl( post(emptyBody) }.build() + var isCompletedRequest = false + var retryCount = 0 + while (!isCompletedRequest && now() < deadline) { client.newCall(request).execute().use { response -> val body = try { mapper.readValue(response.body?.string(), ExternalSysResponse::class.java) @@ -67,6 +71,19 @@ class PaymentExternalSystemAdapterImpl( ExternalSysResponse(transactionId.toString(), paymentId.toString(), false, e.message) } + if (body.message?.contains("Temporary error") == true) { + if (retryCount < 7) { + retryCount++ + val backoffTime = (2.0.pow(retryCount.toDouble()) * 10 + Random().nextLong(0, 10)).toLong() + Thread.sleep(backoffTime) + continue + } else { + isCompletedRequest = true + } + } else { + isCompletedRequest = true + } + logger.warn("[$accountName] Payment processed for txId: $transactionId, payment: $paymentId, succeeded: ${body.result}, message: ${body.message}") // Здесь мы обновляем состояние оплаты в зависимости от результата в базе данных оплат. @@ -75,6 +92,7 @@ class PaymentExternalSystemAdapterImpl( it.logProcessing(body.result, now(), transactionId, reason = body.message) } } + } } catch (e: Exception) { when (e) { is SocketTimeoutException -> { @@ -83,7 +101,6 @@ class PaymentExternalSystemAdapterImpl( it.logProcessing(false, now(), transactionId, reason = "Request timeout.") } } - else -> { logger.error("[$accountName] Payment failed for txId: $transactionId, payment: $paymentId", e) diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index bb6025436..73440a6fd 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -23,6 +23,6 @@ payment.service-name=${PAYMENT_SERVICE_NAME} payment.token=${PAYMENT_TOKEN} # payment.accounts=${PAYMENT_ACCOUNTS:acc-12,acc-20} # payment.accounts=${PAYMENT_ACCOUNTS:acc-3} -payment.accounts=${PAYMENT_ACCOUNTS:acc-23} +payment.accounts=${PAYMENT_ACCOUNTS:acc-8} # payment.accounts=${PAYMENT_ACCOUNTS:acc-18} payment.hostPort=${PAYMENT_HOST:localhost}:${PAYMENT_PORT:1234} \ No newline at end of file