diff --git a/src/main/kotlin/ru/quipy/apigateway/APIController.kt b/src/main/kotlin/ru/quipy/apigateway/APIController.kt index 6a3ab4966..251ee553e 100644 --- a/src/main/kotlin/ru/quipy/apigateway/APIController.kt +++ b/src/main/kotlin/ru/quipy/apigateway/APIController.kt @@ -11,6 +11,7 @@ import ru.quipy.orders.repository.OrderRepository import ru.quipy.payments.logic.OrderPayer import java.time.Duration import java.util.* +import kotlin.random.Random @RestController class APIController( @@ -23,7 +24,7 @@ class APIController( val logger: Logger = LoggerFactory.getLogger(APIController::class.java) @PostMapping("/users") - suspend fun createUser(@RequestBody req: CreateUserRequest): User { + fun createUser(@RequestBody req: CreateUserRequest): User { return User(UUID.randomUUID(), req.name) } @@ -32,7 +33,7 @@ class APIController( data class User(val id: UUID, val name: String) @PostMapping("/orders") - suspend fun createOrder(@RequestParam userId: UUID, @RequestParam price: Int): Order { + fun createOrder(@RequestParam userId: UUID, @RequestParam price: Int): Order { val order = Order( UUID.randomUUID(), userId, @@ -58,9 +59,10 @@ class APIController( } @PostMapping("/orders/{orderId}/payment") - suspend fun payOrder(@PathVariable orderId: UUID, @RequestParam deadline: Long): PaymentSubmissionDto { + fun payOrder(@PathVariable orderId: UUID, @RequestParam deadline: Long): PaymentSubmissionDto { if (!rateLimiter.tick()) { - throw TooManyRequestsException(deadline) + val retryAfterMs = 10L + Random.nextLong(10) + throw TooManyRequestsException(retryAfterMs) } val paymentId = UUID.randomUUID() val order = orderRepository.findById(orderId)?.let { diff --git a/src/main/kotlin/ru/quipy/apigateway/GlobalExceptionHandler.kt b/src/main/kotlin/ru/quipy/apigateway/GlobalExceptionHandler.kt index 5679ea372..ea33b9e32 100644 --- a/src/main/kotlin/ru/quipy/apigateway/GlobalExceptionHandler.kt +++ b/src/main/kotlin/ru/quipy/apigateway/GlobalExceptionHandler.kt @@ -26,8 +26,13 @@ class GlobalExceptionHandler( fun handleTooManyRequests(exception: TooLongRequestException): ResponseEntity { return ResponseEntity.status(200).body("your request very long, i am so sorry") } + @ExceptionHandler(TooManyRequestsException::class) fun handleTooManyRequestsRetriable(exception: TooManyRequestsException): ResponseEntity { - return ResponseEntity.status(HttpStatus.TOO_MANY_REQUESTS).header("Retry-After", "1").body("to many requests") + val retryAfterSeconds = (exception.retryAfterMs / 1000.0).coerceAtLeast(0.1) + return ResponseEntity + .status(HttpStatus.TOO_MANY_REQUESTS) + .header("Retry-After", retryAfterSeconds.toString()) + .body("too many requests") } } \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/common/utils/SlidingWindowRateLimiter.kt b/src/main/kotlin/ru/quipy/common/utils/SlidingWindowRateLimiter.kt index 4537b42a8..4f3f64050 100644 --- a/src/main/kotlin/ru/quipy/common/utils/SlidingWindowRateLimiter.kt +++ b/src/main/kotlin/ru/quipy/common/utils/SlidingWindowRateLimiter.kt @@ -1,7 +1,5 @@ package ru.quipy.common.utils -import com.github.dockerjava.zerodep.shaded.org.apache.hc.core5.util.Deadline -import com.github.dockerjava.zerodep.shaded.org.apache.hc.core5.util.Timeout import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.asCoroutineDispatcher import kotlinx.coroutines.delay @@ -33,18 +31,26 @@ class SlidingWindowRateLimiter( } } - fun tickBlocking() { + fun tickBlockingAsync() { while (!tick()) { Thread.sleep(10) } } - suspend fun tickBlocking(timeout: Long): Boolean { + fun tickBlockingWithTimeout(timeout: Long): Boolean { val timeStarted = System.currentTimeMillis() - while (System.currentTimeMillis()-timeStarted < timeout && !tick()) { + while (System.currentTimeMillis() - timeStarted < timeout && !tick()) { + Thread.sleep(2) + } + return System.currentTimeMillis() - timeStarted < timeout + } + + suspend fun tickBlockingAsync(timeout: Long): Boolean { + val timeStarted = System.currentTimeMillis() + while (System.currentTimeMillis() - timeStarted < timeout && !tick()) { delay(2) } - return System.currentTimeMillis()-timeStarted < timeout + return System.currentTimeMillis() - timeStarted < timeout } diff --git a/src/main/kotlin/ru/quipy/exceptions/TooManyRequestsException.kt b/src/main/kotlin/ru/quipy/exceptions/TooManyRequestsException.kt index cacd67810..3f063849a 100644 --- a/src/main/kotlin/ru/quipy/exceptions/TooManyRequestsException.kt +++ b/src/main/kotlin/ru/quipy/exceptions/TooManyRequestsException.kt @@ -1,3 +1,3 @@ package ru.quipy.exceptions -class TooManyRequestsException(val deadline: Long) : RuntimeException() \ No newline at end of file +class TooManyRequestsException(val retryAfterMs: Long) : RuntimeException() \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt index 8861311a1..e2bd9e203 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt @@ -26,7 +26,7 @@ class OrderPayer(meterRegistry: MeterRegistry) { private lateinit var paymentService: PaymentService - suspend fun processPayment(orderId: UUID, amount: Int, paymentId: UUID, deadline: Long): Long { + fun processPayment(orderId: UUID, amount: Int, paymentId: UUID, deadline: Long): Long { val createdAt = System.currentTimeMillis() plannedRequests.increment() val createdEvent = paymentESService.create { diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt index 0f9475fc2..598992345 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt @@ -2,11 +2,16 @@ package ru.quipy.payments.logic import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.registerKotlinModule -import com.github.dockerjava.api.model.Link import io.micrometer.core.instrument.MeterRegistry -import kotlinx.coroutines.delay +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.CoroutineExceptionHandler +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.asCoroutineDispatcher +import kotlinx.coroutines.launch import kotlinx.coroutines.sync.Semaphore import org.slf4j.LoggerFactory +import ru.quipy.common.utils.NamedThreadFactory import ru.quipy.common.utils.SlidingWindowRateLimiter import ru.quipy.core.EventSourcingService import ru.quipy.exceptions.TooManyRequestsException @@ -22,6 +27,7 @@ import java.util.* import java.util.concurrent.Executors import java.util.concurrent.TimeUnit import kotlin.math.pow +import kotlin.random.Random // Advice: always treat time as a Duration @@ -31,17 +37,29 @@ class PaymentExternalSystemAdapterImpl( private val paymentProviderHostPort: String, private val token: String, meterRegistry: MeterRegistry, - private val parallelLimiter: Semaphore + private val parallelLimiter: Semaphore, + private val ioDispatcher: CoroutineDispatcher = Executors.newFixedThreadPool( + Runtime.getRuntime().availableProcessors() * 2, + NamedThreadFactory("payment-io-") + ).asCoroutineDispatcher() ) : PaymentExternalSystemAdapter { companion object { val logger = LoggerFactory.getLogger(PaymentExternalSystemAdapter::class.java) val mapper = ObjectMapper().registerKotlinModule() } - private val remaining = 20_000L + private val exceptionHandler = CoroutineExceptionHandler { _, throwable -> + logger.error("[$accountName] Unhandled exception in payment adapter coroutine", throwable) + } + + private val scope = CoroutineScope(SupervisorJob() + ioDispatcher + exceptionHandler) + + // 2025-11-20T20:30:35.780+03:00 INFO 56644 --- [alhost:1234/...] ru.quipy.core.EventSourcingService : Optimistic lock exception. Failed to save event records id: [7dca693e-e811-4b7f-8bce-23e13d952c04-4] - private val startedRequests = meterRegistry.counter("payment.processing.started", "accountName", properties.accountName) - private val timer = meterRegistry.timer("payment.external.system.request.latency", "accountName", properties.accountName) + private val startedRequests = + meterRegistry.counter("payment.processing.started", "accountName", properties.accountName) + private val timer = + meterRegistry.timer("payment.external.system.request.latency", "accountName", properties.accountName) private val serviceName = properties.serviceName private val accountName = properties.accountName private val requestAverageProcessingTime = properties.averageProcessingTime @@ -50,7 +68,7 @@ class PaymentExternalSystemAdapterImpl( private val rateLimit: SlidingWindowRateLimiter by lazy { SlidingWindowRateLimiter( - rate = rateLimitPerSec.toLong(), + rate = (rateLimitPerSec * 0.95).toLong(), window = Duration.ofMillis(1000), ) } @@ -61,35 +79,78 @@ class PaymentExternalSystemAdapterImpl( .version(HttpClient.Version.HTTP_2) .build() + private val retryScheduler = Executors.newScheduledThreadPool( + Runtime.getRuntime().availableProcessors() + ) - override suspend fun performPaymentAsync(paymentId: UUID, amount: Int, paymentStartedAt: Long, deadline: Long) { + override fun performPaymentAsync(paymentId: UUID, amount: Int, paymentStartedAt: Long, deadline: Long) { logger.warn("[$accountName] Submitting payment request for payment $paymentId") val transactionId = UUID.randomUUID() // Вне зависимости от исхода оплаты важно отметить что она была отправлена. // Это требуется сделать ВО ВСЕХ СЛУЧАЯХ, поскольку эта информация используется сервисом тестирования. - paymentESService.update(paymentId) { - it.logSubmission(success = true, transactionId, now(), Duration.ofMillis(now() - paymentStartedAt)) + scope.launch { + try { + paymentESService.update(paymentId) { + it.logSubmission( + success = true, + transactionId, + now(), + Duration.ofMillis(now() - paymentStartedAt) + ) + } + logger.info("[$accountName] Log submission recorded for $paymentId") + } catch (e: Exception) { + logger.error("[$accountName] Failed to record log submission for $paymentId", e) + } } logger.info("[$accountName] Submit: $paymentId , txId: $transactionId") + val remaining = deadline - now() - if (remaining <= 0) { + val minRequiredTime = requestAverageProcessingTime.toMillis() * 2 + if (remaining < minRequiredTime) { + logger.warn("[$accountName] Not enough time for payment $paymentId: ${remaining}ms remaining, need ${minRequiredTime}ms") paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, "deadline expired") + it.logProcessing(false, now(), transactionId, "not enough time") } - return + val retryAfterMs = minRequiredTime - remaining + Random.nextLong(100) + throw TooManyRequestsException(retryAfterMs) } val timeBeforeCall = now() - remaining.coerceAtMost(this.remaining) - tryAcquire(now(), remaining) - if (!rateLimit.tickBlocking(deadline- now())) { - throw TooManyRequestsException(deadline) + + if (!tryAcquire(now(), deadline - now())) { + val retryAfterMs = (requestAverageProcessingTime.toMillis() / parallelRequests * 10).coerceIn( + 10, + 100 + ) + Random.nextLong(10) + + throw TooManyRequestsException(retryAfterMs) + } + + if (!rateLimit.tickBlockingWithTimeout(deadline - now())) { + parallelLimiter.release() + + val retryAfterMs = (1000L / rateLimitPerSec * 10).coerceIn(10, 100) + Random.nextLong(10) + throw TooManyRequestsException(retryAfterMs) + } + + val requestTimeout = minOf( + deadline - now(), + requestAverageProcessingTime.toMillis() * 2 + ).coerceAtLeast(100) + + if (requestTimeout < requestAverageProcessingTime.toMillis()) { + logger.warn("[$accountName] Timeout too short for payment $paymentId: ${requestTimeout}ms") + parallelLimiter.release() + val retryAfterMs = requestAverageProcessingTime.toMillis() - requestTimeout + Random.nextLong(100) + throw TooManyRequestsException(retryAfterMs) } + val request = HttpRequest.newBuilder() .uri(URI("http://$paymentProviderHostPort/external/process?serviceName=$serviceName&token=$token&accountName=$accountName&transactionId=$transactionId&paymentId=$paymentId&amount=$amount")) - .timeout(Duration.ofMillis(30_000)) + .timeout(Duration.ofMillis(requestTimeout)) .POST(HttpRequest.BodyPublishers.noBody()) .build() @@ -106,73 +167,67 @@ class PaymentExternalSystemAdapterImpl( fun tryAcquire(startedAt: Long, remaining: Long): Boolean { var isAcquired = parallelLimiter.tryAcquire() - while (!isAcquired && now()-startedAt < remaining) { + while (!isAcquired && now() - startedAt < remaining) { isAcquired = parallelLimiter.tryAcquire() + Thread.sleep(1) } return isAcquired } - fun completeAction(retryCount: Long, request: HttpRequest, paymentId: UUID, transactionId: UUID, timeBeforeCall: Long, deadline: Long) { + fun completeAction( + retryCount: Long, + request: HttpRequest, + paymentId: UUID, + transactionId: UUID, + timeBeforeCall: Long, + deadline: Long + ) { httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()) .whenComplete { response, throwable -> - if (throwable != null) { - val e = throwable.cause - when (throwable.cause) { - is SocketTimeoutException -> { - logger.warn("[$accountName] attempt ${retryCount + 1} timeout: $paymentId", e) - paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, "socket timeout") + scope.launch { + try { + if (throwable != null) { + val e = throwable.cause + when (throwable.cause) { + is SocketTimeoutException -> { + logger.warn("[$accountName] attempt ${retryCount + 1} timeout: $paymentId", e) } - } - is InterruptedIOException -> { - logger.warn("[$accountName] interrupted: $paymentId", e) - - // Здесь мы обновляем состояние оплаты в зависимости от результата в базе данных оплат. - // Это требуется сделать ВО ВСЕХ ИСХОДАХ (успешная оплата / неуспешная / ошибочная ситуация) - paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, "interrupted IO") + is InterruptedIOException -> { + logger.warn("[$accountName] interrupted: $paymentId", e) } - } - else -> { - logger.warn("[$accountName] io error: $paymentId", e) - paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, "io exception") + else -> { + logger.warn("[$accountName] io error: $paymentId", e) } } - } - if (retryCount + 1 >= 3) { - paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, "Max attempts reached") - } - startedRequests.increment() - parallelLimiter.release() - } else { - val backoff = ((2.0.pow(retryCount.toDouble()) * 25).toLong() + kotlin.random.Random.nextLong(10)) - val capped = backoff.coerceAtMost(deadline - now() - 5) - if (capped <= 0) { + if (retryCount + 1 >= 3) { paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, "Deadline expired") + it.logProcessing(false, now(), transactionId, "Max attempts reached") } - startedRequests.increment() } else { - Thread.sleep(backoff) - completeAction( - retryCount + 1, - request, - paymentId, - transactionId, - timeBeforeCall, - deadline - ) + val backoff = ((2.0.pow(retryCount.toDouble()) * 25).toLong() + Random.nextLong(10)) + val capped = backoff.coerceAtMost(deadline - now() - 5) + if (capped <= 0) { + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "Deadline expired") + } + } else { + scheduleRetry( + retryCount, + request, + paymentId, + transactionId, + timeBeforeCall, + deadline, + capped + ) + return@launch + } } - } - } else { - startedRequests.increment() - try { + } else { logger.warn("Free space in semaphore: {}", parallelLimiter.availablePermits) logger.info( "success in callback for payment: {}, retry count: {}, in time: {}", @@ -190,13 +245,50 @@ class PaymentExternalSystemAdapterImpl( paymentESService.update(paymentId) { it.logProcessing(parsed.result, now(), transactionId, parsed.message) } - } finally { - parallelLimiter.release() timer.record(now() - timeBeforeCall, TimeUnit.MILLISECONDS) } + } catch (e: Exception) { + logger.error("[$accountName] Error processing payment $paymentId", e) + } finally { + startedRequests.increment() + parallelLimiter.release() + } + } + } + } + + private fun scheduleRetry( + retryCount: Long, request: HttpRequest, paymentId: UUID, + transactionId: UUID, timeBeforeCall: Long, deadline: Long, delay: Long + ) { + retryScheduler.schedule({ + val remainingTime = deadline - now() + if (remainingTime < requestAverageProcessingTime.toMillis()) { + scope.launch { + try { + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "Not enough time for retry") + } + } catch (e: Exception) { + logger.error("[$accountName] Failed to record retry failure for $paymentId", e) + } finally { + startedRequests.increment() + parallelLimiter.release() } } + return@schedule } + + val newRequestTimeout = remainingTime.coerceIn(100, requestAverageProcessingTime.toMillis() * 2) + val newRequest = HttpRequest.newBuilder() + .uri(request.uri()) + .timeout(Duration.ofMillis(newRequestTimeout)) + .POST(HttpRequest.BodyPublishers.noBody()) + .build() + + completeAction(retryCount + 1, newRequest, paymentId, transactionId, timeBeforeCall, deadline) + }, delay, TimeUnit.MILLISECONDS) } +} fun now() = System.currentTimeMillis() \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentService.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentService.kt index d67aebbf0..255db77dd 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentService.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentService.kt @@ -7,7 +7,7 @@ interface PaymentService { /** * Submit payment request to some external service. */ - suspend fun submitPaymentRequest(paymentId: UUID, amount: Int, paymentStartedAt: Long, deadline: Long) + fun submitPaymentRequest(paymentId: UUID, amount: Int, paymentStartedAt: Long, deadline: Long) } /** @@ -17,7 +17,7 @@ interface PaymentService { */ interface PaymentExternalSystemAdapter { - suspend fun performPaymentAsync(paymentId: UUID, amount: Int, paymentStartedAt: Long, deadline: Long) + fun performPaymentAsync(paymentId: UUID, amount: Int, paymentStartedAt: Long, deadline: Long) fun name(): String diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentServiceImpl.kt index 3fb881462..7aad9b847 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentServiceImpl.kt @@ -13,7 +13,7 @@ class PaymentSystemImpl( val logger = LoggerFactory.getLogger(PaymentSystemImpl::class.java) } - override suspend fun submitPaymentRequest(paymentId: UUID, amount: Int, paymentStartedAt: Long, deadline: Long) { + override fun submitPaymentRequest(paymentId: UUID, amount: Int, paymentStartedAt: Long, deadline: Long) { for (account in paymentAccounts) { account.performPaymentAsync(paymentId, amount, paymentStartedAt, deadline) }