From dd8540c44e9a5317cf69045ff9045b823f2b3b69 Mon Sep 17 00:00:00 2001 From: RuReVange Date: Sun, 7 Dec 2025 12:26:35 +0300 Subject: [PATCH 01/24] fix: functionality without suspend --- .../ru/quipy/apigateway/APIController.kt | 6 +++--- .../common/utils/SlidingWindowRateLimiter.kt | 18 ++++++++++++------ .../ru/quipy/payments/logic/OrderPayer.kt | 2 +- .../logic/PaymentExternalServiceImpl.kt | 11 ++++------- .../ru/quipy/payments/logic/PaymentService.kt | 4 ++-- .../quipy/payments/logic/PaymentServiceImpl.kt | 2 +- 6 files changed, 23 insertions(+), 20 deletions(-) diff --git a/src/main/kotlin/ru/quipy/apigateway/APIController.kt b/src/main/kotlin/ru/quipy/apigateway/APIController.kt index 6a3ab4966..9c4d48218 100644 --- a/src/main/kotlin/ru/quipy/apigateway/APIController.kt +++ b/src/main/kotlin/ru/quipy/apigateway/APIController.kt @@ -23,7 +23,7 @@ class APIController( val logger: Logger = LoggerFactory.getLogger(APIController::class.java) @PostMapping("/users") - suspend fun createUser(@RequestBody req: CreateUserRequest): User { + fun createUser(@RequestBody req: CreateUserRequest): User { return User(UUID.randomUUID(), req.name) } @@ -32,7 +32,7 @@ class APIController( data class User(val id: UUID, val name: String) @PostMapping("/orders") - suspend fun createOrder(@RequestParam userId: UUID, @RequestParam price: Int): Order { + fun createOrder(@RequestParam userId: UUID, @RequestParam price: Int): Order { val order = Order( UUID.randomUUID(), userId, @@ -58,7 +58,7 @@ class APIController( } @PostMapping("/orders/{orderId}/payment") - suspend fun payOrder(@PathVariable orderId: UUID, @RequestParam deadline: Long): PaymentSubmissionDto { + fun payOrder(@PathVariable orderId: UUID, @RequestParam deadline: Long): PaymentSubmissionDto { if (!rateLimiter.tick()) { throw TooManyRequestsException(deadline) } diff --git a/src/main/kotlin/ru/quipy/common/utils/SlidingWindowRateLimiter.kt b/src/main/kotlin/ru/quipy/common/utils/SlidingWindowRateLimiter.kt index 4537b42a8..4f3f64050 100644 --- a/src/main/kotlin/ru/quipy/common/utils/SlidingWindowRateLimiter.kt +++ b/src/main/kotlin/ru/quipy/common/utils/SlidingWindowRateLimiter.kt @@ -1,7 +1,5 @@ package ru.quipy.common.utils -import com.github.dockerjava.zerodep.shaded.org.apache.hc.core5.util.Deadline -import com.github.dockerjava.zerodep.shaded.org.apache.hc.core5.util.Timeout import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.asCoroutineDispatcher import kotlinx.coroutines.delay @@ -33,18 +31,26 @@ class SlidingWindowRateLimiter( } } - fun tickBlocking() { + fun tickBlockingAsync() { while (!tick()) { Thread.sleep(10) } } - suspend fun tickBlocking(timeout: Long): Boolean { + fun tickBlockingWithTimeout(timeout: Long): Boolean { val timeStarted = System.currentTimeMillis() - while (System.currentTimeMillis()-timeStarted < timeout && !tick()) { + while (System.currentTimeMillis() - timeStarted < timeout && !tick()) { + Thread.sleep(2) + } + return System.currentTimeMillis() - timeStarted < timeout + } + + suspend fun tickBlockingAsync(timeout: Long): Boolean { + val timeStarted = System.currentTimeMillis() + while (System.currentTimeMillis() - timeStarted < timeout && !tick()) { delay(2) } - return System.currentTimeMillis()-timeStarted < timeout + return System.currentTimeMillis() - timeStarted < timeout } diff --git a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt index 8861311a1..e2bd9e203 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt @@ -26,7 +26,7 @@ class OrderPayer(meterRegistry: MeterRegistry) { private lateinit var paymentService: PaymentService - suspend fun processPayment(orderId: UUID, amount: Int, paymentId: UUID, deadline: Long): Long { + fun processPayment(orderId: UUID, amount: Int, paymentId: UUID, deadline: Long): Long { val createdAt = System.currentTimeMillis() plannedRequests.increment() val createdEvent = paymentESService.create { diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt index 0f9475fc2..df99db920 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt @@ -2,9 +2,7 @@ package ru.quipy.payments.logic import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.registerKotlinModule -import com.github.dockerjava.api.model.Link import io.micrometer.core.instrument.MeterRegistry -import kotlinx.coroutines.delay import kotlinx.coroutines.sync.Semaphore import org.slf4j.LoggerFactory import ru.quipy.common.utils.SlidingWindowRateLimiter @@ -61,8 +59,7 @@ class PaymentExternalSystemAdapterImpl( .version(HttpClient.Version.HTTP_2) .build() - - override suspend fun performPaymentAsync(paymentId: UUID, amount: Int, paymentStartedAt: Long, deadline: Long) { + override fun performPaymentAsync(paymentId: UUID, amount: Int, paymentStartedAt: Long, deadline: Long) { logger.warn("[$accountName] Submitting payment request for payment $paymentId") val transactionId = UUID.randomUUID() @@ -84,7 +81,7 @@ class PaymentExternalSystemAdapterImpl( val timeBeforeCall = now() remaining.coerceAtMost(this.remaining) tryAcquire(now(), remaining) - if (!rateLimit.tickBlocking(deadline- now())) { + if (!rateLimit.tickBlockingWithTimeout(deadline - now())) { throw TooManyRequestsException(deadline) } val request = HttpRequest.newBuilder() @@ -106,7 +103,7 @@ class PaymentExternalSystemAdapterImpl( fun tryAcquire(startedAt: Long, remaining: Long): Boolean { var isAcquired = parallelLimiter.tryAcquire() - while (!isAcquired && now()-startedAt < remaining) { + while (!isAcquired && now() - startedAt < remaining) { isAcquired = parallelLimiter.tryAcquire() } @@ -159,7 +156,7 @@ class PaymentExternalSystemAdapterImpl( } startedRequests.increment() } else { - Thread.sleep(backoff) + Thread.sleep(capped) completeAction( retryCount + 1, request, diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentService.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentService.kt index d67aebbf0..255db77dd 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentService.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentService.kt @@ -7,7 +7,7 @@ interface PaymentService { /** * Submit payment request to some external service. */ - suspend fun submitPaymentRequest(paymentId: UUID, amount: Int, paymentStartedAt: Long, deadline: Long) + fun submitPaymentRequest(paymentId: UUID, amount: Int, paymentStartedAt: Long, deadline: Long) } /** @@ -17,7 +17,7 @@ interface PaymentService { */ interface PaymentExternalSystemAdapter { - suspend fun performPaymentAsync(paymentId: UUID, amount: Int, paymentStartedAt: Long, deadline: Long) + fun performPaymentAsync(paymentId: UUID, amount: Int, paymentStartedAt: Long, deadline: Long) fun name(): String diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentServiceImpl.kt index 3fb881462..7aad9b847 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentServiceImpl.kt @@ -13,7 +13,7 @@ class PaymentSystemImpl( val logger = LoggerFactory.getLogger(PaymentSystemImpl::class.java) } - override suspend fun submitPaymentRequest(paymentId: UUID, amount: Int, paymentStartedAt: Long, deadline: Long) { + override fun submitPaymentRequest(paymentId: UUID, amount: Int, paymentStartedAt: Long, deadline: Long) { for (account in paymentAccounts) { account.performPaymentAsync(paymentId, amount, paymentStartedAt, deadline) } From 58ca47ca3f4c6e847ea03ccbf79c4a14bc70e4b3 Mon Sep 17 00:00:00 2001 From: RuReVange Date: Sun, 7 Dec 2025 13:46:56 +0300 Subject: [PATCH 02/24] fix: correct retry after logic, semaphore release --- .../ru/quipy/apigateway/APIController.kt | 4 ++- .../apigateway/GlobalExceptionHandler.kt | 7 ++++- .../exceptions/TooManyRequestsException.kt | 2 +- .../logic/PaymentExternalServiceImpl.kt | 26 +++++++++++++++---- 4 files changed, 31 insertions(+), 8 deletions(-) diff --git a/src/main/kotlin/ru/quipy/apigateway/APIController.kt b/src/main/kotlin/ru/quipy/apigateway/APIController.kt index 9c4d48218..251ee553e 100644 --- a/src/main/kotlin/ru/quipy/apigateway/APIController.kt +++ b/src/main/kotlin/ru/quipy/apigateway/APIController.kt @@ -11,6 +11,7 @@ import ru.quipy.orders.repository.OrderRepository import ru.quipy.payments.logic.OrderPayer import java.time.Duration import java.util.* +import kotlin.random.Random @RestController class APIController( @@ -60,7 +61,8 @@ class APIController( @PostMapping("/orders/{orderId}/payment") fun payOrder(@PathVariable orderId: UUID, @RequestParam deadline: Long): PaymentSubmissionDto { if (!rateLimiter.tick()) { - throw TooManyRequestsException(deadline) + val retryAfterMs = 10L + Random.nextLong(10) + throw TooManyRequestsException(retryAfterMs) } val paymentId = UUID.randomUUID() val order = orderRepository.findById(orderId)?.let { diff --git a/src/main/kotlin/ru/quipy/apigateway/GlobalExceptionHandler.kt b/src/main/kotlin/ru/quipy/apigateway/GlobalExceptionHandler.kt index 5679ea372..ea33b9e32 100644 --- a/src/main/kotlin/ru/quipy/apigateway/GlobalExceptionHandler.kt +++ b/src/main/kotlin/ru/quipy/apigateway/GlobalExceptionHandler.kt @@ -26,8 +26,13 @@ class GlobalExceptionHandler( fun handleTooManyRequests(exception: TooLongRequestException): ResponseEntity { return ResponseEntity.status(200).body("your request very long, i am so sorry") } + @ExceptionHandler(TooManyRequestsException::class) fun handleTooManyRequestsRetriable(exception: TooManyRequestsException): ResponseEntity { - return ResponseEntity.status(HttpStatus.TOO_MANY_REQUESTS).header("Retry-After", "1").body("to many requests") + val retryAfterSeconds = (exception.retryAfterMs / 1000.0).coerceAtLeast(0.1) + return ResponseEntity + .status(HttpStatus.TOO_MANY_REQUESTS) + .header("Retry-After", retryAfterSeconds.toString()) + .body("too many requests") } } \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/exceptions/TooManyRequestsException.kt b/src/main/kotlin/ru/quipy/exceptions/TooManyRequestsException.kt index cacd67810..3f063849a 100644 --- a/src/main/kotlin/ru/quipy/exceptions/TooManyRequestsException.kt +++ b/src/main/kotlin/ru/quipy/exceptions/TooManyRequestsException.kt @@ -1,3 +1,3 @@ package ru.quipy.exceptions -class TooManyRequestsException(val deadline: Long) : RuntimeException() \ No newline at end of file +class TooManyRequestsException(val retryAfterMs: Long) : RuntimeException() \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt index df99db920..8ee7edf32 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt @@ -20,6 +20,7 @@ import java.util.* import java.util.concurrent.Executors import java.util.concurrent.TimeUnit import kotlin.math.pow +import kotlin.random.Random // Advice: always treat time as a Duration @@ -36,7 +37,7 @@ class PaymentExternalSystemAdapterImpl( val mapper = ObjectMapper().registerKotlinModule() } - private val remaining = 20_000L + private val maxRequestTimeout = 20_000L // 2025-11-20T20:30:35.780+03:00 INFO 56644 --- [alhost:1234/...] ru.quipy.core.EventSourcingService : Optimistic lock exception. Failed to save event records id: [7dca693e-e811-4b7f-8bce-23e13d952c04-4] private val startedRequests = meterRegistry.counter("payment.processing.started", "accountName", properties.accountName) private val timer = meterRegistry.timer("payment.external.system.request.latency", "accountName", properties.accountName) @@ -79,14 +80,28 @@ class PaymentExternalSystemAdapterImpl( } val timeBeforeCall = now() - remaining.coerceAtMost(this.remaining) - tryAcquire(now(), remaining) + + if (!tryAcquire(now(), deadline - now())) { + val retryAfterMs = (requestAverageProcessingTime.toMillis() / parallelRequests * 10).coerceIn(10, 100) + Random.nextLong(10) + + throw TooManyRequestsException(retryAfterMs) + } + if (!rateLimit.tickBlockingWithTimeout(deadline - now())) { - throw TooManyRequestsException(deadline) + parallelLimiter.release() + + val retryAfterMs = (1000L / rateLimitPerSec * 10).coerceIn(10, 100) + Random.nextLong(10) + throw TooManyRequestsException(retryAfterMs) } + + val requestTimeout = minOf( + deadline - now(), + requestAverageProcessingTime.toMillis() * 2 + ).coerceAtLeast(100) + val request = HttpRequest.newBuilder() .uri(URI("http://$paymentProviderHostPort/external/process?serviceName=$serviceName&token=$token&accountName=$accountName&transactionId=$transactionId&paymentId=$paymentId&amount=$amount")) - .timeout(Duration.ofMillis(30_000)) + .timeout(Duration.ofMillis(requestTimeout)) .POST(HttpRequest.BodyPublishers.noBody()) .build() @@ -155,6 +170,7 @@ class PaymentExternalSystemAdapterImpl( it.logProcessing(false, now(), transactionId, "Deadline expired") } startedRequests.increment() + parallelLimiter.release() } else { Thread.sleep(capped) completeAction( From ffccdc33de7b866a59b8b63d31fb8ae605ee08bb Mon Sep 17 00:00:00 2001 From: RuReVange Date: Sun, 7 Dec 2025 14:06:33 +0300 Subject: [PATCH 03/24] fix: logging duplicate --- .../payments/logic/PaymentExternalServiceImpl.kt | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt index 8ee7edf32..190614df6 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt @@ -37,7 +37,6 @@ class PaymentExternalSystemAdapterImpl( val mapper = ObjectMapper().registerKotlinModule() } - private val maxRequestTimeout = 20_000L // 2025-11-20T20:30:35.780+03:00 INFO 56644 --- [alhost:1234/...] ru.quipy.core.EventSourcingService : Optimistic lock exception. Failed to save event records id: [7dca693e-e811-4b7f-8bce-23e13d952c04-4] private val startedRequests = meterRegistry.counter("payment.processing.started", "accountName", properties.accountName) private val timer = meterRegistry.timer("payment.external.system.request.latency", "accountName", properties.accountName) @@ -120,6 +119,7 @@ class PaymentExternalSystemAdapterImpl( var isAcquired = parallelLimiter.tryAcquire() while (!isAcquired && now() - startedAt < remaining) { isAcquired = parallelLimiter.tryAcquire() + Thread.sleep(1) } return isAcquired @@ -133,26 +133,14 @@ class PaymentExternalSystemAdapterImpl( when (throwable.cause) { is SocketTimeoutException -> { logger.warn("[$accountName] attempt ${retryCount + 1} timeout: $paymentId", e) - paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, "socket timeout") - } } is InterruptedIOException -> { logger.warn("[$accountName] interrupted: $paymentId", e) - - // Здесь мы обновляем состояние оплаты в зависимости от результата в базе данных оплат. - // Это требуется сделать ВО ВСЕХ ИСХОДАХ (успешная оплата / неуспешная / ошибочная ситуация) - paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, "interrupted IO") - } } else -> { logger.warn("[$accountName] io error: $paymentId", e) - paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, "io exception") - } } } From d68f49c3cb79de1d2a906be951c7e1d3f71a88eb Mon Sep 17 00:00:00 2001 From: RuReVange Date: Sun, 7 Dec 2025 14:17:58 +0300 Subject: [PATCH 04/24] feat: retry with new timeout for request --- .../quipy/payments/logic/PaymentExternalServiceImpl.kt | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt index 190614df6..22f7923cd 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt @@ -161,9 +161,17 @@ class PaymentExternalSystemAdapterImpl( parallelLimiter.release() } else { Thread.sleep(capped) + + val newRequestTimeout = (deadline - now()).coerceIn(100, requestAverageProcessingTime.toMillis() * 2) + val newRequest = HttpRequest.newBuilder() + .uri(request.uri()) + .timeout(Duration.ofMillis(newRequestTimeout)) + .POST(HttpRequest.BodyPublishers.noBody()) + .build() + completeAction( retryCount + 1, - request, + newRequest, paymentId, transactionId, timeBeforeCall, From d9a15ca13a78f1760a6f7c4b050af71eb064e430 Mon Sep 17 00:00:00 2001 From: RuReVange Date: Sun, 7 Dec 2025 14:26:17 +0300 Subject: [PATCH 05/24] fix: softer rate limiter --- .../ru/quipy/payments/logic/PaymentExternalServiceImpl.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt index 22f7923cd..c597b9765 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt @@ -48,7 +48,7 @@ class PaymentExternalSystemAdapterImpl( private val rateLimit: SlidingWindowRateLimiter by lazy { SlidingWindowRateLimiter( - rate = rateLimitPerSec.toLong(), + rate = (rateLimitPerSec * 0.95).toLong(), window = Duration.ofMillis(1000), ) } From 060407be3da8393ec00b28430f9d7237560370bc Mon Sep 17 00:00:00 2001 From: RuReVange Date: Sun, 7 Dec 2025 14:39:51 +0300 Subject: [PATCH 06/24] fix: additional retry after --- .../logic/PaymentExternalServiceImpl.kt | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt index c597b9765..5bd3a308c 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt @@ -48,7 +48,7 @@ class PaymentExternalSystemAdapterImpl( private val rateLimit: SlidingWindowRateLimiter by lazy { SlidingWindowRateLimiter( - rate = (rateLimitPerSec * 0.95).toLong(), + rate = (rateLimitPerSec * 0.9).toLong(), window = Duration.ofMillis(1000), ) } @@ -70,12 +70,16 @@ class PaymentExternalSystemAdapterImpl( } logger.info("[$accountName] Submit: $paymentId , txId: $transactionId") + val remaining = deadline - now() - if (remaining <= 0) { + val minRequiredTime = requestAverageProcessingTime.toMillis() + 5000 + if (remaining < minRequiredTime) { + logger.warn("[$accountName] Not enough time for payment $paymentId: ${remaining}ms remaining, need ${minRequiredTime}ms") paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, "deadline expired") + it.logProcessing(false, now(), transactionId, "not enough time") } - return + val retryAfterMs = minRequiredTime - remaining + Random.nextLong(100) + throw TooManyRequestsException(retryAfterMs) } val timeBeforeCall = now() @@ -98,6 +102,13 @@ class PaymentExternalSystemAdapterImpl( requestAverageProcessingTime.toMillis() * 2 ).coerceAtLeast(100) + if (requestTimeout < requestAverageProcessingTime.toMillis()) { + logger.warn("[$accountName] Timeout too short for payment $paymentId: ${requestTimeout}ms") + parallelLimiter.release() + val retryAfterMs = requestAverageProcessingTime.toMillis() - requestTimeout + Random.nextLong(100) + throw TooManyRequestsException(retryAfterMs) + } + val request = HttpRequest.newBuilder() .uri(URI("http://$paymentProviderHostPort/external/process?serviceName=$serviceName&token=$token&accountName=$accountName&transactionId=$transactionId&paymentId=$paymentId&amount=$amount")) .timeout(Duration.ofMillis(requestTimeout)) From c3eaad4f76428b01f83c465739e21e13233a5936 Mon Sep 17 00:00:00 2001 From: RuReVange Date: Sun, 7 Dec 2025 14:47:59 +0300 Subject: [PATCH 07/24] fix: minRequiredTime --- .../ru/quipy/payments/logic/PaymentExternalServiceImpl.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt index 5bd3a308c..446b90bbe 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt @@ -72,7 +72,7 @@ class PaymentExternalSystemAdapterImpl( logger.info("[$accountName] Submit: $paymentId , txId: $transactionId") val remaining = deadline - now() - val minRequiredTime = requestAverageProcessingTime.toMillis() + 5000 + val minRequiredTime = requestAverageProcessingTime.toMillis() * 2 if (remaining < minRequiredTime) { logger.warn("[$accountName] Not enough time for payment $paymentId: ${remaining}ms remaining, need ${minRequiredTime}ms") paymentESService.update(paymentId) { From 586ff6c269650cc6eb87c57faea786a8cc0deb3c Mon Sep 17 00:00:00 2001 From: RuReVange Date: Sun, 7 Dec 2025 14:57:23 +0300 Subject: [PATCH 08/24] feat: sheduler for retry --- .../logic/PaymentExternalServiceImpl.kt | 49 ++++++++++++------- 1 file changed, 32 insertions(+), 17 deletions(-) diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt index 446b90bbe..4f1d559f0 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt @@ -59,6 +59,10 @@ class PaymentExternalSystemAdapterImpl( .version(HttpClient.Version.HTTP_2) .build() + private val retryScheduler = Executors.newScheduledThreadPool( + Runtime.getRuntime().availableProcessors() + ) + override fun performPaymentAsync(paymentId: UUID, amount: Int, paymentStartedAt: Long, deadline: Long) { logger.warn("[$accountName] Submitting payment request for payment $paymentId") val transactionId = UUID.randomUUID() @@ -171,23 +175,34 @@ class PaymentExternalSystemAdapterImpl( startedRequests.increment() parallelLimiter.release() } else { - Thread.sleep(capped) - - val newRequestTimeout = (deadline - now()).coerceIn(100, requestAverageProcessingTime.toMillis() * 2) - val newRequest = HttpRequest.newBuilder() - .uri(request.uri()) - .timeout(Duration.ofMillis(newRequestTimeout)) - .POST(HttpRequest.BodyPublishers.noBody()) - .build() - - completeAction( - retryCount + 1, - newRequest, - paymentId, - transactionId, - timeBeforeCall, - deadline - ) + retryScheduler.schedule({ + val remainingTime = deadline - now() + + if (remainingTime < requestAverageProcessingTime.toMillis()) { + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "Not enough time for retry") + } + startedRequests.increment() + parallelLimiter.release() + return@schedule + } + + val newRequestTimeout = remainingTime.coerceIn(100, requestAverageProcessingTime.toMillis() * 2) + val newRequest = HttpRequest.newBuilder() + .uri(request.uri()) + .timeout(Duration.ofMillis(newRequestTimeout)) + .POST(HttpRequest.BodyPublishers.noBody()) + .build() + + completeAction( + retryCount + 1, + newRequest, + paymentId, + transactionId, + timeBeforeCall, + deadline + ) + }, capped, TimeUnit.MILLISECONDS) } } } else { From 92ef40493aa5607bf12a7e022ce1796d1af9ef93 Mon Sep 17 00:00:00 2001 From: 21092004Goda Date: Sun, 7 Dec 2025 23:45:34 +0300 Subject: [PATCH 09/24] -- draft: Test async logic in payment external service. --- .../logic/PaymentExternalServiceImpl.kt | 175 ++++++++++++------ 1 file changed, 114 insertions(+), 61 deletions(-) diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt index 4f1d559f0..b67d86e9a 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt @@ -3,6 +3,12 @@ package ru.quipy.payments.logic import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.registerKotlinModule import io.micrometer.core.instrument.MeterRegistry +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.CoroutineExceptionHandler +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.asCoroutineDispatcher +import kotlinx.coroutines.launch import kotlinx.coroutines.sync.Semaphore import org.slf4j.LoggerFactory import ru.quipy.common.utils.SlidingWindowRateLimiter @@ -30,16 +36,26 @@ class PaymentExternalSystemAdapterImpl( private val paymentProviderHostPort: String, private val token: String, meterRegistry: MeterRegistry, - private val parallelLimiter: Semaphore + private val parallelLimiter: Semaphore, + private val ioDispatcher: CoroutineDispatcher = Executors.newFixedThreadPool(10).asCoroutineDispatcher() ) : PaymentExternalSystemAdapter { companion object { val logger = LoggerFactory.getLogger(PaymentExternalSystemAdapter::class.java) val mapper = ObjectMapper().registerKotlinModule() } + private val exceptionHandler = CoroutineExceptionHandler { _, throwable -> + logger.error("[$accountName] Unhandled exception in payment adapter coroutine", throwable) + } + + private val scope = CoroutineScope(SupervisorJob() + ioDispatcher + exceptionHandler) + + // 2025-11-20T20:30:35.780+03:00 INFO 56644 --- [alhost:1234/...] ru.quipy.core.EventSourcingService : Optimistic lock exception. Failed to save event records id: [7dca693e-e811-4b7f-8bce-23e13d952c04-4] - private val startedRequests = meterRegistry.counter("payment.processing.started", "accountName", properties.accountName) - private val timer = meterRegistry.timer("payment.external.system.request.latency", "accountName", properties.accountName) + private val startedRequests = + meterRegistry.counter("payment.processing.started", "accountName", properties.accountName) + private val timer = + meterRegistry.timer("payment.external.system.request.latency", "accountName", properties.accountName) private val serviceName = properties.serviceName private val accountName = properties.accountName private val requestAverageProcessingTime = properties.averageProcessingTime @@ -69,8 +85,20 @@ class PaymentExternalSystemAdapterImpl( // Вне зависимости от исхода оплаты важно отметить что она была отправлена. // Это требуется сделать ВО ВСЕХ СЛУЧАЯХ, поскольку эта информация используется сервисом тестирования. - paymentESService.update(paymentId) { - it.logSubmission(success = true, transactionId, now(), Duration.ofMillis(now() - paymentStartedAt)) + scope.launch { + try { + paymentESService.update(paymentId) { + it.logSubmission( + success = true, + transactionId, + now(), + Duration.ofMillis(now() - paymentStartedAt) + ) + } + logger.info("[$accountName] Log submission recorded for $paymentId") + } catch (e: Exception) { + logger.error("[$accountName] Failed to record log submission for $paymentId", e) + } } logger.info("[$accountName] Submit: $paymentId , txId: $transactionId") @@ -89,7 +117,10 @@ class PaymentExternalSystemAdapterImpl( val timeBeforeCall = now() if (!tryAcquire(now(), deadline - now())) { - val retryAfterMs = (requestAverageProcessingTime.toMillis() / parallelRequests * 10).coerceIn(10, 100) + Random.nextLong(10) + val retryAfterMs = (requestAverageProcessingTime.toMillis() / parallelRequests * 10).coerceIn( + 10, + 100 + ) + Random.nextLong(10) throw TooManyRequestsException(retryAfterMs) } @@ -140,74 +171,59 @@ class PaymentExternalSystemAdapterImpl( return isAcquired } - fun completeAction(retryCount: Long, request: HttpRequest, paymentId: UUID, transactionId: UUID, timeBeforeCall: Long, deadline: Long) { + fun completeAction( + retryCount: Long, + request: HttpRequest, + paymentId: UUID, + transactionId: UUID, + timeBeforeCall: Long, + deadline: Long + ) { httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()) .whenComplete { response, throwable -> - if (throwable != null) { - val e = throwable.cause - when (throwable.cause) { - is SocketTimeoutException -> { - logger.warn("[$accountName] attempt ${retryCount + 1} timeout: $paymentId", e) - } + scope.launch { + try { + if (throwable != null) { + val e = throwable.cause + when (throwable.cause) { + is SocketTimeoutException -> { + logger.warn("[$accountName] attempt ${retryCount + 1} timeout: $paymentId", e) + } - is InterruptedIOException -> { - logger.warn("[$accountName] interrupted: $paymentId", e) - } + is InterruptedIOException -> { + logger.warn("[$accountName] interrupted: $paymentId", e) + } - else -> { - logger.warn("[$accountName] io error: $paymentId", e) + else -> { + logger.warn("[$accountName] io error: $paymentId", e) + } } - } - if (retryCount + 1 >= 3) { - paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, "Max attempts reached") - } - startedRequests.increment() - parallelLimiter.release() - } else { - val backoff = ((2.0.pow(retryCount.toDouble()) * 25).toLong() + kotlin.random.Random.nextLong(10)) - val capped = backoff.coerceAtMost(deadline - now() - 5) - if (capped <= 0) { + if (retryCount + 1 >= 3) { paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, "Deadline expired") + it.logProcessing(false, now(), transactionId, "Max attempts reached") } - startedRequests.increment() - parallelLimiter.release() } else { - retryScheduler.schedule({ - val remainingTime = deadline - now() - - if (remainingTime < requestAverageProcessingTime.toMillis()) { - paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, "Not enough time for retry") - } - startedRequests.increment() - parallelLimiter.release() - return@schedule + val backoff = ((2.0.pow(retryCount.toDouble()) * 25).toLong() + Random.nextLong(10)) + val capped = backoff.coerceAtMost(deadline - now() - 5) + if (capped <= 0) { + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "Deadline expired") } - - val newRequestTimeout = remainingTime.coerceIn(100, requestAverageProcessingTime.toMillis() * 2) - val newRequest = HttpRequest.newBuilder() - .uri(request.uri()) - .timeout(Duration.ofMillis(newRequestTimeout)) - .POST(HttpRequest.BodyPublishers.noBody()) - .build() - - completeAction( - retryCount + 1, - newRequest, + } else { + scheduleRetry( + retryCount, + request, paymentId, transactionId, timeBeforeCall, - deadline + deadline, + capped ) - }, capped, TimeUnit.MILLISECONDS) + return@launch + } } - } - } else { - startedRequests.increment() - try { + } else { logger.warn("Free space in semaphore: {}", parallelLimiter.availablePermits) logger.info( "success in callback for payment: {}, retry count: {}, in time: {}", @@ -225,13 +241,50 @@ class PaymentExternalSystemAdapterImpl( paymentESService.update(paymentId) { it.logProcessing(parsed.result, now(), transactionId, parsed.message) } - } finally { - parallelLimiter.release() timer.record(now() - timeBeforeCall, TimeUnit.MILLISECONDS) } + } catch (e: Exception) { + logger.error("[$accountName] Error processing payment $paymentId", e) + } finally { + startedRequests.increment() + parallelLimiter.release() } } } } + private fun scheduleRetry( + retryCount: Long, request: HttpRequest, paymentId: UUID, + transactionId: UUID, timeBeforeCall: Long, deadline: Long, delay: Long + ) { + retryScheduler.schedule({ + val remainingTime = deadline - now() + if (remainingTime < requestAverageProcessingTime.toMillis()) { + scope.launch { + try { + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "Not enough time for retry") + } + } catch (e: Exception) { + logger.error("[$accountName] Failed to record retry failure for $paymentId", e) + } finally { + startedRequests.increment() + parallelLimiter.release() + } + } + return@schedule + } + + val newRequestTimeout = remainingTime.coerceIn(100, requestAverageProcessingTime.toMillis() * 2) + val newRequest = HttpRequest.newBuilder() + .uri(request.uri()) + .timeout(Duration.ofMillis(newRequestTimeout)) + .POST(HttpRequest.BodyPublishers.noBody()) + .build() + + completeAction(retryCount + 1, newRequest, paymentId, transactionId, timeBeforeCall, deadline) + }, delay, TimeUnit.MILLISECONDS) + } +} + fun now() = System.currentTimeMillis() \ No newline at end of file From 873d83ead9cb50624de0b02a15b5a84b303adc70 Mon Sep 17 00:00:00 2001 From: 21092004Goda Date: Sun, 7 Dec 2025 23:45:34 +0300 Subject: [PATCH 10/24] -- draft: Test async logic in payment external service. --- .../logic/PaymentExternalServiceImpl.kt | 179 ++++++++++++------ 1 file changed, 118 insertions(+), 61 deletions(-) diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt index 4f1d559f0..aa1b6f07d 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt @@ -3,8 +3,15 @@ package ru.quipy.payments.logic import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.registerKotlinModule import io.micrometer.core.instrument.MeterRegistry +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.CoroutineExceptionHandler +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.asCoroutineDispatcher +import kotlinx.coroutines.launch import kotlinx.coroutines.sync.Semaphore import org.slf4j.LoggerFactory +import ru.quipy.common.utils.NamedThreadFactory import ru.quipy.common.utils.SlidingWindowRateLimiter import ru.quipy.core.EventSourcingService import ru.quipy.exceptions.TooManyRequestsException @@ -30,16 +37,29 @@ class PaymentExternalSystemAdapterImpl( private val paymentProviderHostPort: String, private val token: String, meterRegistry: MeterRegistry, - private val parallelLimiter: Semaphore + private val parallelLimiter: Semaphore, + private val ioDispatcher: CoroutineDispatcher = Executors.newFixedThreadPool( + Runtime.getRuntime().availableProcessors() * 4, + NamedThreadFactory("payment-io-") + ).asCoroutineDispatcher() ) : PaymentExternalSystemAdapter { companion object { val logger = LoggerFactory.getLogger(PaymentExternalSystemAdapter::class.java) val mapper = ObjectMapper().registerKotlinModule() } + private val exceptionHandler = CoroutineExceptionHandler { _, throwable -> + logger.error("[$accountName] Unhandled exception in payment adapter coroutine", throwable) + } + + private val scope = CoroutineScope(SupervisorJob() + ioDispatcher + exceptionHandler) + + // 2025-11-20T20:30:35.780+03:00 INFO 56644 --- [alhost:1234/...] ru.quipy.core.EventSourcingService : Optimistic lock exception. Failed to save event records id: [7dca693e-e811-4b7f-8bce-23e13d952c04-4] - private val startedRequests = meterRegistry.counter("payment.processing.started", "accountName", properties.accountName) - private val timer = meterRegistry.timer("payment.external.system.request.latency", "accountName", properties.accountName) + private val startedRequests = + meterRegistry.counter("payment.processing.started", "accountName", properties.accountName) + private val timer = + meterRegistry.timer("payment.external.system.request.latency", "accountName", properties.accountName) private val serviceName = properties.serviceName private val accountName = properties.accountName private val requestAverageProcessingTime = properties.averageProcessingTime @@ -69,8 +89,20 @@ class PaymentExternalSystemAdapterImpl( // Вне зависимости от исхода оплаты важно отметить что она была отправлена. // Это требуется сделать ВО ВСЕХ СЛУЧАЯХ, поскольку эта информация используется сервисом тестирования. - paymentESService.update(paymentId) { - it.logSubmission(success = true, transactionId, now(), Duration.ofMillis(now() - paymentStartedAt)) + scope.launch { + try { + paymentESService.update(paymentId) { + it.logSubmission( + success = true, + transactionId, + now(), + Duration.ofMillis(now() - paymentStartedAt) + ) + } + logger.info("[$accountName] Log submission recorded for $paymentId") + } catch (e: Exception) { + logger.error("[$accountName] Failed to record log submission for $paymentId", e) + } } logger.info("[$accountName] Submit: $paymentId , txId: $transactionId") @@ -89,7 +121,10 @@ class PaymentExternalSystemAdapterImpl( val timeBeforeCall = now() if (!tryAcquire(now(), deadline - now())) { - val retryAfterMs = (requestAverageProcessingTime.toMillis() / parallelRequests * 10).coerceIn(10, 100) + Random.nextLong(10) + val retryAfterMs = (requestAverageProcessingTime.toMillis() / parallelRequests * 10).coerceIn( + 10, + 100 + ) + Random.nextLong(10) throw TooManyRequestsException(retryAfterMs) } @@ -140,74 +175,59 @@ class PaymentExternalSystemAdapterImpl( return isAcquired } - fun completeAction(retryCount: Long, request: HttpRequest, paymentId: UUID, transactionId: UUID, timeBeforeCall: Long, deadline: Long) { + fun completeAction( + retryCount: Long, + request: HttpRequest, + paymentId: UUID, + transactionId: UUID, + timeBeforeCall: Long, + deadline: Long + ) { httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()) .whenComplete { response, throwable -> - if (throwable != null) { - val e = throwable.cause - when (throwable.cause) { - is SocketTimeoutException -> { - logger.warn("[$accountName] attempt ${retryCount + 1} timeout: $paymentId", e) - } + scope.launch { + try { + if (throwable != null) { + val e = throwable.cause + when (throwable.cause) { + is SocketTimeoutException -> { + logger.warn("[$accountName] attempt ${retryCount + 1} timeout: $paymentId", e) + } - is InterruptedIOException -> { - logger.warn("[$accountName] interrupted: $paymentId", e) - } + is InterruptedIOException -> { + logger.warn("[$accountName] interrupted: $paymentId", e) + } - else -> { - logger.warn("[$accountName] io error: $paymentId", e) + else -> { + logger.warn("[$accountName] io error: $paymentId", e) + } } - } - if (retryCount + 1 >= 3) { - paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, "Max attempts reached") - } - startedRequests.increment() - parallelLimiter.release() - } else { - val backoff = ((2.0.pow(retryCount.toDouble()) * 25).toLong() + kotlin.random.Random.nextLong(10)) - val capped = backoff.coerceAtMost(deadline - now() - 5) - if (capped <= 0) { + if (retryCount + 1 >= 3) { paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, "Deadline expired") + it.logProcessing(false, now(), transactionId, "Max attempts reached") } - startedRequests.increment() - parallelLimiter.release() } else { - retryScheduler.schedule({ - val remainingTime = deadline - now() - - if (remainingTime < requestAverageProcessingTime.toMillis()) { - paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, "Not enough time for retry") - } - startedRequests.increment() - parallelLimiter.release() - return@schedule + val backoff = ((2.0.pow(retryCount.toDouble()) * 25).toLong() + Random.nextLong(10)) + val capped = backoff.coerceAtMost(deadline - now() - 5) + if (capped <= 0) { + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "Deadline expired") } - - val newRequestTimeout = remainingTime.coerceIn(100, requestAverageProcessingTime.toMillis() * 2) - val newRequest = HttpRequest.newBuilder() - .uri(request.uri()) - .timeout(Duration.ofMillis(newRequestTimeout)) - .POST(HttpRequest.BodyPublishers.noBody()) - .build() - - completeAction( - retryCount + 1, - newRequest, + } else { + scheduleRetry( + retryCount, + request, paymentId, transactionId, timeBeforeCall, - deadline + deadline, + capped ) - }, capped, TimeUnit.MILLISECONDS) + return@launch + } } - } - } else { - startedRequests.increment() - try { + } else { logger.warn("Free space in semaphore: {}", parallelLimiter.availablePermits) logger.info( "success in callback for payment: {}, retry count: {}, in time: {}", @@ -225,13 +245,50 @@ class PaymentExternalSystemAdapterImpl( paymentESService.update(paymentId) { it.logProcessing(parsed.result, now(), transactionId, parsed.message) } - } finally { - parallelLimiter.release() timer.record(now() - timeBeforeCall, TimeUnit.MILLISECONDS) } + } catch (e: Exception) { + logger.error("[$accountName] Error processing payment $paymentId", e) + } finally { + startedRequests.increment() + parallelLimiter.release() } } } } + private fun scheduleRetry( + retryCount: Long, request: HttpRequest, paymentId: UUID, + transactionId: UUID, timeBeforeCall: Long, deadline: Long, delay: Long + ) { + retryScheduler.schedule({ + val remainingTime = deadline - now() + if (remainingTime < requestAverageProcessingTime.toMillis()) { + scope.launch { + try { + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "Not enough time for retry") + } + } catch (e: Exception) { + logger.error("[$accountName] Failed to record retry failure for $paymentId", e) + } finally { + startedRequests.increment() + parallelLimiter.release() + } + } + return@schedule + } + + val newRequestTimeout = remainingTime.coerceIn(100, requestAverageProcessingTime.toMillis() * 2) + val newRequest = HttpRequest.newBuilder() + .uri(request.uri()) + .timeout(Duration.ofMillis(newRequestTimeout)) + .POST(HttpRequest.BodyPublishers.noBody()) + .build() + + completeAction(retryCount + 1, newRequest, paymentId, transactionId, timeBeforeCall, deadline) + }, delay, TimeUnit.MILLISECONDS) + } +} + fun now() = System.currentTimeMillis() \ No newline at end of file From 3a4c8e9901b5d1fa236588153c249ebd12a20a87 Mon Sep 17 00:00:00 2001 From: 21092004Goda Date: Mon, 8 Dec 2025 00:24:55 +0300 Subject: [PATCH 11/24] Merge remote-tracking branch 'origin/hw-9-io-kuro' into hw-9-io-kuro # Conflicts: # src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt --- .../ru/quipy/payments/logic/PaymentExternalServiceImpl.kt | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt index a150dcffc..c9c25ccf8 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt @@ -39,7 +39,7 @@ class PaymentExternalSystemAdapterImpl( meterRegistry: MeterRegistry, private val parallelLimiter: Semaphore, private val ioDispatcher: CoroutineDispatcher = Executors.newFixedThreadPool( - Runtime.getRuntime().availableProcessors() * 8, + (properties.parallelRequests / 20).coerceIn(100, 1000), NamedThreadFactory("payment-io-") ).asCoroutineDispatcher() ) : PaymentExternalSystemAdapter { @@ -48,6 +48,8 @@ class PaymentExternalSystemAdapterImpl( val mapper = ObjectMapper().registerKotlinModule() } + private val coroutineLimiter = Semaphore(5000) + private val exceptionHandler = CoroutineExceptionHandler { _, throwable -> logger.error("[$accountName] Unhandled exception in payment adapter coroutine", throwable) } @@ -87,6 +89,10 @@ class PaymentExternalSystemAdapterImpl( logger.warn("[$accountName] Submitting payment request for payment $paymentId") val transactionId = UUID.randomUUID() + if (!coroutineLimiter.tryAcquire()) { + // Слишком много активных корутин - отклонить + throw TooManyRequestsException(100L) + } // Вне зависимости от исхода оплаты важно отметить что она была отправлена. // Это требуется сделать ВО ВСЕХ СЛУЧАЯХ, поскольку эта информация используется сервисом тестирования. scope.launch { From 9bd1887e1a1a58c135b74c85bdfa0bb4035ae632 Mon Sep 17 00:00:00 2001 From: 21092004Goda Date: Mon, 8 Dec 2025 00:30:00 +0300 Subject: [PATCH 12/24] Merge remote-tracking branch 'origin/hw-9-io-kuro' into hw-9-io-kuro # Conflicts: # src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt --- .../ru/quipy/payments/logic/PaymentExternalServiceImpl.kt | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt index c9c25ccf8..086cf25dc 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt @@ -39,7 +39,7 @@ class PaymentExternalSystemAdapterImpl( meterRegistry: MeterRegistry, private val parallelLimiter: Semaphore, private val ioDispatcher: CoroutineDispatcher = Executors.newFixedThreadPool( - (properties.parallelRequests / 20).coerceIn(100, 1000), + Runtime.getRuntime().availableProcessors() * 2, NamedThreadFactory("payment-io-") ).asCoroutineDispatcher() ) : PaymentExternalSystemAdapter { @@ -48,8 +48,6 @@ class PaymentExternalSystemAdapterImpl( val mapper = ObjectMapper().registerKotlinModule() } - private val coroutineLimiter = Semaphore(5000) - private val exceptionHandler = CoroutineExceptionHandler { _, throwable -> logger.error("[$accountName] Unhandled exception in payment adapter coroutine", throwable) } @@ -89,10 +87,6 @@ class PaymentExternalSystemAdapterImpl( logger.warn("[$accountName] Submitting payment request for payment $paymentId") val transactionId = UUID.randomUUID() - if (!coroutineLimiter.tryAcquire()) { - // Слишком много активных корутин - отклонить - throw TooManyRequestsException(100L) - } // Вне зависимости от исхода оплаты важно отметить что она была отправлена. // Это требуется сделать ВО ВСЕХ СЛУЧАЯХ, поскольку эта информация используется сервисом тестирования. scope.launch { From 7f6250ff9f2c221fbe25077a19f77fb962dfbe77 Mon Sep 17 00:00:00 2001 From: 21092004Goda Date: Mon, 8 Dec 2025 00:35:29 +0300 Subject: [PATCH 13/24] Merge remote-tracking branch 'origin/hw-9-io-kuro' into hw-9-io-kuro # Conflicts: # src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt --- .../ru/quipy/payments/logic/PaymentExternalServiceImpl.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt index 086cf25dc..35d1055e5 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt @@ -39,7 +39,7 @@ class PaymentExternalSystemAdapterImpl( meterRegistry: MeterRegistry, private val parallelLimiter: Semaphore, private val ioDispatcher: CoroutineDispatcher = Executors.newFixedThreadPool( - Runtime.getRuntime().availableProcessors() * 2, + Runtime.getRuntime().availableProcessors(), NamedThreadFactory("payment-io-") ).asCoroutineDispatcher() ) : PaymentExternalSystemAdapter { From 47d7c7a9c877c82d78ef67c2cda9ccb698599658 Mon Sep 17 00:00:00 2001 From: 21092004Goda Date: Mon, 8 Dec 2025 00:47:39 +0300 Subject: [PATCH 14/24] Merge remote-tracking branch 'origin/hw-9-io-kuro' into hw-9-io-kuro # Conflicts: # src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt --- .../ru/quipy/payments/logic/PaymentExternalServiceImpl.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt index 35d1055e5..67e72355a 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt @@ -39,7 +39,7 @@ class PaymentExternalSystemAdapterImpl( meterRegistry: MeterRegistry, private val parallelLimiter: Semaphore, private val ioDispatcher: CoroutineDispatcher = Executors.newFixedThreadPool( - Runtime.getRuntime().availableProcessors(), + (Runtime.getRuntime().availableProcessors() * 1.5).toInt(), NamedThreadFactory("payment-io-") ).asCoroutineDispatcher() ) : PaymentExternalSystemAdapter { From ccd9f041b55392b21073d52de0fce8f8e222867b Mon Sep 17 00:00:00 2001 From: 21092004Goda Date: Mon, 8 Dec 2025 00:50:14 +0300 Subject: [PATCH 15/24] Merge remote-tracking branch 'origin/hw-9-io-kuro' into hw-9-io-kuro # Conflicts: # src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt --- .../ru/quipy/payments/logic/PaymentExternalServiceImpl.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt index 67e72355a..086cf25dc 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt @@ -39,7 +39,7 @@ class PaymentExternalSystemAdapterImpl( meterRegistry: MeterRegistry, private val parallelLimiter: Semaphore, private val ioDispatcher: CoroutineDispatcher = Executors.newFixedThreadPool( - (Runtime.getRuntime().availableProcessors() * 1.5).toInt(), + Runtime.getRuntime().availableProcessors() * 2, NamedThreadFactory("payment-io-") ).asCoroutineDispatcher() ) : PaymentExternalSystemAdapter { From d96a1859fd646d3beb14866e33d11ef3b7c8f7c7 Mon Sep 17 00:00:00 2001 From: 21092004Goda Date: Mon, 8 Dec 2025 00:55:52 +0300 Subject: [PATCH 16/24] Merge remote-tracking branch 'origin/hw-9-io-kuro' into hw-9-io-kuro # Conflicts: # src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt --- .../logic/PaymentExternalServiceImpl.kt | 162 ++++++++++-------- 1 file changed, 86 insertions(+), 76 deletions(-) diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt index 086cf25dc..89502dc6d 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt @@ -3,12 +3,7 @@ package ru.quipy.payments.logic import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.registerKotlinModule import io.micrometer.core.instrument.MeterRegistry -import kotlinx.coroutines.CoroutineDispatcher -import kotlinx.coroutines.CoroutineExceptionHandler -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.SupervisorJob -import kotlinx.coroutines.asCoroutineDispatcher -import kotlinx.coroutines.launch +import kotlinx.coroutines.* import kotlinx.coroutines.sync.Semaphore import org.slf4j.LoggerFactory import ru.quipy.common.utils.NamedThreadFactory @@ -25,12 +20,11 @@ import java.net.http.HttpResponse import java.time.Duration import java.util.* import java.util.concurrent.Executors +import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.TimeUnit import kotlin.math.pow import kotlin.random.Random - -// Advice: always treat time as a Duration class PaymentExternalSystemAdapterImpl( private val properties: PaymentAccountProperties, private val paymentESService: EventSourcingService, @@ -38,24 +32,26 @@ class PaymentExternalSystemAdapterImpl( private val token: String, meterRegistry: MeterRegistry, private val parallelLimiter: Semaphore, - private val ioDispatcher: CoroutineDispatcher = Executors.newFixedThreadPool( - Runtime.getRuntime().availableProcessors() * 2, - NamedThreadFactory("payment-io-") - ).asCoroutineDispatcher() -) : PaymentExternalSystemAdapter { +) : PaymentExternalSystemAdapter, AutoCloseable { + companion object { val logger = LoggerFactory.getLogger(PaymentExternalSystemAdapter::class.java) val mapper = ObjectMapper().registerKotlinModule() } + private val sharedScheduler: ScheduledExecutorService = Executors.newScheduledThreadPool( + Runtime.getRuntime().availableProcessors() * 2, + NamedThreadFactory("payment-shared-${properties.accountName}-") + ) + + private val ioDispatcher: CoroutineDispatcher = sharedScheduler.asCoroutineDispatcher() + private val exceptionHandler = CoroutineExceptionHandler { _, throwable -> logger.error("[$accountName] Unhandled exception in payment adapter coroutine", throwable) } private val scope = CoroutineScope(SupervisorJob() + ioDispatcher + exceptionHandler) - - // 2025-11-20T20:30:35.780+03:00 INFO 56644 --- [alhost:1234/...] ru.quipy.core.EventSourcingService : Optimistic lock exception. Failed to save event records id: [7dca693e-e811-4b7f-8bce-23e13d952c04-4] private val startedRequests = meterRegistry.counter("payment.processing.started", "accountName", properties.accountName) private val timer = @@ -75,20 +71,14 @@ class PaymentExternalSystemAdapterImpl( private val httpClient = HttpClient .newBuilder() - .executor(Executors.newFixedThreadPool(parallelRequests)) + .executor(sharedScheduler) .version(HttpClient.Version.HTTP_2) .build() - private val retryScheduler = Executors.newScheduledThreadPool( - Runtime.getRuntime().availableProcessors() - ) - override fun performPaymentAsync(paymentId: UUID, amount: Int, paymentStartedAt: Long, deadline: Long) { logger.warn("[$accountName] Submitting payment request for payment $paymentId") val transactionId = UUID.randomUUID() - // Вне зависимости от исхода оплаты важно отметить что она была отправлена. - // Это требуется сделать ВО ВСЕХ СЛУЧАЯХ, поскольку эта информация используется сервисом тестирования. scope.launch { try { paymentESService.update(paymentId) { @@ -111,9 +101,12 @@ class PaymentExternalSystemAdapterImpl( val minRequiredTime = requestAverageProcessingTime.toMillis() * 2 if (remaining < minRequiredTime) { logger.warn("[$accountName] Not enough time for payment $paymentId: ${remaining}ms remaining, need ${minRequiredTime}ms") - paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, "not enough time") + runBlocking { + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "not enough time") + } } + val retryAfterMs = minRequiredTime - remaining + Random.nextLong(100) throw TooManyRequestsException(retryAfterMs) } @@ -185,74 +178,79 @@ class PaymentExternalSystemAdapterImpl( ) { httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()) .whenComplete { response, throwable -> - scope.launch { - try { - if (throwable != null) { - val e = throwable.cause - when (throwable.cause) { - is SocketTimeoutException -> { - logger.warn("[$accountName] attempt ${retryCount + 1} timeout: $paymentId", e) - } + try { + if (throwable != null) { + val e = throwable.cause + when (throwable.cause) { + is SocketTimeoutException -> { + logger.warn("[$accountName] attempt ${retryCount + 1} timeout: $paymentId", e) + } - is InterruptedIOException -> { - logger.warn("[$accountName] interrupted: $paymentId", e) - } + is InterruptedIOException -> { + logger.warn("[$accountName] interrupted: $paymentId", e) + } - else -> { - logger.warn("[$accountName] io error: $paymentId", e) - } + else -> { + logger.warn("[$accountName] io error: $paymentId", e) } + } - if (retryCount + 1 >= 3) { + if (retryCount + 1 >= 3) { + runBlocking { paymentESService.update(paymentId) { it.logProcessing(false, now(), transactionId, "Max attempts reached") } - } else { - val backoff = ((2.0.pow(retryCount.toDouble()) * 25).toLong() + Random.nextLong(10)) - val capped = backoff.coerceAtMost(deadline - now() - 5) - if (capped <= 0) { + } + } else { + val backoff = ((2.0.pow(retryCount.toDouble()) * 25).toLong() + Random.nextLong(10)) + val capped = backoff.coerceAtMost(deadline - now() - 5) + if (capped <= 0) { + runBlocking { paymentESService.update(paymentId) { it.logProcessing(false, now(), transactionId, "Deadline expired") } - } else { - scheduleRetry( - retryCount, - request, - paymentId, - transactionId, - timeBeforeCall, - deadline, - capped - ) - return@launch } + } else { + scheduleRetry( + retryCount, + request, + paymentId, + transactionId, + timeBeforeCall, + deadline, + capped + ) + return@whenComplete } - } else { - logger.warn("Free space in semaphore: {}", parallelLimiter.availablePermits) - logger.info( - "success in callback for payment: {}, retry count: {}, in time: {}", - paymentId, - retryCount, - now() - ) - val rawBody = response.body() - val parsed = try { - mapper.readValue(rawBody, ExternalSysResponse::class.java) - } catch (ex: Exception) { - ExternalSysResponse(transactionId.toString(), paymentId.toString(), false, ex.message) - } + } + } else { + logger.warn("Free space in semaphore: {}", parallelLimiter.availablePermits) + logger.info( + "success in callback for payment: {}, retry count: {}, in time: {}", + paymentId, + retryCount, + now() + ) + val rawBody = response.body() + val parsed = try { + mapper.readValue(rawBody, ExternalSysResponse::class.java) + } catch (ex: Exception) { + ExternalSysResponse(transactionId.toString(), paymentId.toString(), false, ex.message) + } + runBlocking { paymentESService.update(paymentId) { it.logProcessing(parsed.result, now(), transactionId, parsed.message) } - timer.record(now() - timeBeforeCall, TimeUnit.MILLISECONDS) } - } catch (e: Exception) { - logger.error("[$accountName] Error processing payment $paymentId", e) - } finally { - startedRequests.increment() - parallelLimiter.release() + + timer.record(now() - timeBeforeCall, TimeUnit.MILLISECONDS) } + } catch (e: Exception) { + logger.error("[$accountName] Error processing payment $paymentId", e) + } finally { + startedRequests.increment() + parallelLimiter.release() } } } @@ -261,10 +259,10 @@ class PaymentExternalSystemAdapterImpl( retryCount: Long, request: HttpRequest, paymentId: UUID, transactionId: UUID, timeBeforeCall: Long, deadline: Long, delay: Long ) { - retryScheduler.schedule({ + sharedScheduler.schedule({ val remainingTime = deadline - now() if (remainingTime < requestAverageProcessingTime.toMillis()) { - scope.launch { + runBlocking { try { paymentESService.update(paymentId) { it.logProcessing(false, now(), transactionId, "Not enough time for retry") @@ -289,6 +287,18 @@ class PaymentExternalSystemAdapterImpl( completeAction(retryCount + 1, newRequest, paymentId, transactionId, timeBeforeCall, deadline) }, delay, TimeUnit.MILLISECONDS) } + + override fun close() { + sharedScheduler.shutdown() + try { + if (!sharedScheduler.awaitTermination(5, TimeUnit.SECONDS)) { + sharedScheduler.shutdownNow() + } + } catch (e: InterruptedException) { + sharedScheduler.shutdownNow() + Thread.currentThread().interrupt() + } + } } fun now() = System.currentTimeMillis() \ No newline at end of file From 9c1a210599c7e5a5267810ac3cd9c7cbd30fb261 Mon Sep 17 00:00:00 2001 From: 21092004Goda Date: Mon, 8 Dec 2025 01:07:22 +0300 Subject: [PATCH 17/24] Merge remote-tracking branch 'origin/hw-9-io-kuro' into hw-9-io-kuro # Conflicts: # src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt --- .../logic/PaymentExternalServiceImpl.kt | 280 ++++++++++-------- 1 file changed, 160 insertions(+), 120 deletions(-) diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt index 89502dc6d..463b7cf88 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt @@ -22,6 +22,7 @@ import java.util.* import java.util.concurrent.Executors import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.TimeUnit +import java.util.concurrent.TimeoutException import kotlin.math.pow import kotlin.random.Random @@ -35,8 +36,8 @@ class PaymentExternalSystemAdapterImpl( ) : PaymentExternalSystemAdapter, AutoCloseable { companion object { - val logger = LoggerFactory.getLogger(PaymentExternalSystemAdapter::class.java) - val mapper = ObjectMapper().registerKotlinModule() + private val logger = LoggerFactory.getLogger(PaymentExternalSystemAdapter::class.java) + private val mapper = ObjectMapper().registerKotlinModule() } private val sharedScheduler: ScheduledExecutorService = Executors.newScheduledThreadPool( @@ -47,15 +48,14 @@ class PaymentExternalSystemAdapterImpl( private val ioDispatcher: CoroutineDispatcher = sharedScheduler.asCoroutineDispatcher() private val exceptionHandler = CoroutineExceptionHandler { _, throwable -> - logger.error("[$accountName] Unhandled exception in payment adapter coroutine", throwable) + logger.error("[${properties.accountName}] Unhandled exception in payment adapter coroutine", throwable) } private val scope = CoroutineScope(SupervisorJob() + ioDispatcher + exceptionHandler) - private val startedRequests = - meterRegistry.counter("payment.processing.started", "accountName", properties.accountName) - private val timer = - meterRegistry.timer("payment.external.system.request.latency", "accountName", properties.accountName) + private val startedRequests = meterRegistry.counter("payment.processing.started", "accountName", properties.accountName) + private val timer = meterRegistry.timer("payment.external.system.request.latency", "accountName", properties.accountName) + private val serviceName = properties.serviceName private val accountName = properties.accountName private val requestAverageProcessingTime = properties.averageProcessingTime @@ -76,7 +76,6 @@ class PaymentExternalSystemAdapterImpl( .build() override fun performPaymentAsync(paymentId: UUID, amount: Int, paymentStartedAt: Long, deadline: Long) { - logger.warn("[$accountName] Submitting payment request for payment $paymentId") val transactionId = UUID.randomUUID() scope.launch { @@ -101,7 +100,7 @@ class PaymentExternalSystemAdapterImpl( val minRequiredTime = requestAverageProcessingTime.toMillis() * 2 if (remaining < minRequiredTime) { logger.warn("[$accountName] Not enough time for payment $paymentId: ${remaining}ms remaining, need ${minRequiredTime}ms") - runBlocking { + scope.launch { paymentESService.update(paymentId) { it.logProcessing(false, now(), transactionId, "not enough time") } @@ -111,158 +110,185 @@ class PaymentExternalSystemAdapterImpl( throw TooManyRequestsException(retryAfterMs) } - val timeBeforeCall = now() - - if (!tryAcquire(now(), deadline - now())) { - val retryAfterMs = (requestAverageProcessingTime.toMillis() / parallelRequests * 10).coerceIn( - 10, - 100 - ) + Random.nextLong(10) + var acquired = false + try { + if (!tryAcquire(now(), deadline - now())) { + val retryAfterMs = (requestAverageProcessingTime.toMillis() * 2).coerceAtLeast(100) + Random.nextLong(50) + throw TooManyRequestsException(retryAfterMs) + } + acquired = true - throw TooManyRequestsException(retryAfterMs) - } + if (!rateLimit.tickBlockingWithTimeout(deadline - now())) { + val retryAfterMs = (1100L / rateLimitPerSec).coerceAtLeast(100) + Random.nextLong(50) + throw TooManyRequestsException(retryAfterMs) + } - if (!rateLimit.tickBlockingWithTimeout(deadline - now())) { - parallelLimiter.release() + val requestTimeout = minOf( + deadline - now(), + requestAverageProcessingTime.toMillis() * 2 + ).coerceAtLeast(100) - val retryAfterMs = (1000L / rateLimitPerSec * 10).coerceIn(10, 100) + Random.nextLong(10) - throw TooManyRequestsException(retryAfterMs) - } + if (requestTimeout < requestAverageProcessingTime.toMillis()) { + logger.warn("[$accountName] Timeout too short for payment $paymentId: ${requestTimeout}ms") + val retryAfterMs = requestAverageProcessingTime.toMillis() - requestTimeout + Random.nextLong(100) + throw TooManyRequestsException(retryAfterMs) + } - val requestTimeout = minOf( - deadline - now(), - requestAverageProcessingTime.toMillis() * 2 - ).coerceAtLeast(100) + val request = HttpRequest.newBuilder() + .uri(URI("http://$paymentProviderHostPort/external/process?serviceName=$serviceName&token=$token&accountName=$accountName&transactionId=$transactionId&paymentId=$paymentId&amount=$amount")) + .timeout(Duration.ofMillis(requestTimeout)) + .POST(HttpRequest.BodyPublishers.noBody()) + .build() - if (requestTimeout < requestAverageProcessingTime.toMillis()) { - logger.warn("[$accountName] Timeout too short for payment $paymentId: ${requestTimeout}ms") - parallelLimiter.release() - val retryAfterMs = requestAverageProcessingTime.toMillis() - requestTimeout + Random.nextLong(100) - throw TooManyRequestsException(retryAfterMs) + completeAction( + retryCount = 0, + request = request, + paymentId = paymentId, + transactionId = transactionId, + timeBeforeCall = now(), + deadline = deadline, + acquired = true + ) + + } catch (e: TooManyRequestsException) { + if (acquired) { + parallelLimiter.release() + acquired = false + } + throw e + } catch (e: Exception) { + if (acquired) { + parallelLimiter.release() + acquired = false + } + logger.error("[$accountName] Unexpected error in performPaymentAsync for $paymentId", e) + scope.launch { + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "unexpected error: ${e.message}") + } + } + throw e } - - val request = HttpRequest.newBuilder() - .uri(URI("http://$paymentProviderHostPort/external/process?serviceName=$serviceName&token=$token&accountName=$accountName&transactionId=$transactionId&paymentId=$paymentId&amount=$amount")) - .timeout(Duration.ofMillis(requestTimeout)) - .POST(HttpRequest.BodyPublishers.noBody()) - .build() - - val retryCount = 0L - - completeAction(retryCount, request, paymentId, transactionId, timeBeforeCall, deadline) } - override fun price() = properties.price - - override fun isEnabled() = properties.enabled - - override fun name() = properties.accountName - - fun tryAcquire(startedAt: Long, remaining: Long): Boolean { + private fun tryAcquire(startedAt: Long, remaining: Long): Boolean { var isAcquired = parallelLimiter.tryAcquire() while (!isAcquired && now() - startedAt < remaining) { - isAcquired = parallelLimiter.tryAcquire() Thread.sleep(1) + isAcquired = parallelLimiter.tryAcquire() } - return isAcquired } - fun completeAction( + private fun completeAction( retryCount: Long, request: HttpRequest, paymentId: UUID, transactionId: UUID, timeBeforeCall: Long, - deadline: Long + deadline: Long, + acquired: Boolean ) { - httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()) - .whenComplete { response, throwable -> - try { - if (throwable != null) { - val e = throwable.cause - when (throwable.cause) { - is SocketTimeoutException -> { - logger.warn("[$accountName] attempt ${retryCount + 1} timeout: $paymentId", e) - } + val requestTimeout = request.timeout().orElse(Duration.ofSeconds(10)).toMillis() + val future = httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()) + .orTimeout(requestTimeout, TimeUnit.MILLISECONDS) - is InterruptedIOException -> { - logger.warn("[$accountName] interrupted: $paymentId", e) - } + future.whenComplete { response, throwable -> + try { + if (throwable != null) { + val cause = throwable.cause + val isTimeout = cause is SocketTimeoutException || throwable is TimeoutException + val isInterrupted = cause is InterruptedIOException + + val errorType = when { + isTimeout -> "timeout" + isInterrupted -> "interrupted" + else -> "io_error" + } + logger.warn("[$accountName] attempt ${retryCount + 1} $errorType: $paymentId", throwable) - else -> { - logger.warn("[$accountName] io error: $paymentId", e) + if (retryCount >= 2) { + scope.launch { + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "Max attempts reached") } } - - if (retryCount + 1 >= 3) { - runBlocking { + } else { + val backoff = ((2.0.pow(retryCount.toDouble()) * 25).toLong() + Random.nextLong(10)) + val capped = backoff.coerceAtMost(deadline - now() - 10) + if (capped <= 0) { + scope.launch { paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, "Max attempts reached") + it.logProcessing(false, now(), transactionId, "Deadline expired") } } } else { - val backoff = ((2.0.pow(retryCount.toDouble()) * 25).toLong() + Random.nextLong(10)) - val capped = backoff.coerceAtMost(deadline - now() - 5) - if (capped <= 0) { - runBlocking { - paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, "Deadline expired") - } - } - } else { - scheduleRetry( - retryCount, - request, - paymentId, - transactionId, - timeBeforeCall, - deadline, - capped - ) - return@whenComplete - } - } - } else { - logger.warn("Free space in semaphore: {}", parallelLimiter.availablePermits) - logger.info( - "success in callback for payment: {}, retry count: {}, in time: {}", - paymentId, - retryCount, - now() - ) - val rawBody = response.body() - val parsed = try { - mapper.readValue(rawBody, ExternalSysResponse::class.java) - } catch (ex: Exception) { - ExternalSysResponse(transactionId.toString(), paymentId.toString(), false, ex.message) + scheduleRetry( + retryCount = retryCount + 1, + request = request, + paymentId = paymentId, + transactionId = transactionId, + timeBeforeCall = timeBeforeCall, + deadline = deadline, + delay = capped, + acquired = acquired + ) + return@whenComplete } + } + } else { + logger.debug("[$accountName] Free space in semaphore: ${parallelLimiter.availablePermits}") + logger.info( + "[$accountName] success for payment: {}, retry count: {}, in time: {}", + paymentId, + retryCount, + now() + ) + val rawBody = response.body() + val parsed = try { + mapper.readValue(rawBody, ExternalSysResponse::class.java) + } catch (ex: Exception) { + ExternalSysResponse(transactionId.toString(), paymentId.toString(), false, ex.message) + } - runBlocking { - paymentESService.update(paymentId) { - it.logProcessing(parsed.result, now(), transactionId, parsed.message) - } + scope.launch { + paymentESService.update(paymentId) { + it.logProcessing(parsed.result, now(), transactionId, parsed.message) } + } - timer.record(now() - timeBeforeCall, TimeUnit.MILLISECONDS) + timer.record(now() - timeBeforeCall, TimeUnit.MILLISECONDS) + } + } catch (e: Exception) { + logger.error("[$accountName] Error in whenComplete for payment $paymentId", e) + scope.launch { + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "callback error: ${e.message}") } - } catch (e: Exception) { - logger.error("[$accountName] Error processing payment $paymentId", e) - } finally { - startedRequests.increment() + } + } finally { + startedRequests.increment() + if (acquired) { parallelLimiter.release() } } + } } private fun scheduleRetry( - retryCount: Long, request: HttpRequest, paymentId: UUID, - transactionId: UUID, timeBeforeCall: Long, deadline: Long, delay: Long + retryCount: Long, + request: HttpRequest, + paymentId: UUID, + transactionId: UUID, + timeBeforeCall: Long, + deadline: Long, + delay: Long, + acquired: Boolean ) { sharedScheduler.schedule({ val remainingTime = deadline - now() if (remainingTime < requestAverageProcessingTime.toMillis()) { - runBlocking { + scope.launch { try { paymentESService.update(paymentId) { it.logProcessing(false, now(), transactionId, "Not enough time for retry") @@ -271,23 +297,37 @@ class PaymentExternalSystemAdapterImpl( logger.error("[$accountName] Failed to record retry failure for $paymentId", e) } finally { startedRequests.increment() - parallelLimiter.release() + if (acquired) parallelLimiter.release() } } return@schedule } - val newRequestTimeout = remainingTime.coerceIn(100, requestAverageProcessingTime.toMillis() * 2) + val newRequestTimeout = minOf(remainingTime, requestAverageProcessingTime.toMillis() * 2).coerceAtLeast(100) val newRequest = HttpRequest.newBuilder() .uri(request.uri()) .timeout(Duration.ofMillis(newRequestTimeout)) .POST(HttpRequest.BodyPublishers.noBody()) .build() - completeAction(retryCount + 1, newRequest, paymentId, transactionId, timeBeforeCall, deadline) + completeAction( + retryCount = retryCount, + request = newRequest, + paymentId = paymentId, + transactionId = transactionId, + timeBeforeCall = timeBeforeCall, + deadline = deadline, + acquired = acquired + ) }, delay, TimeUnit.MILLISECONDS) } + override fun price() = properties.price + + override fun isEnabled() = properties.enabled + + override fun name() = properties.accountName + override fun close() { sharedScheduler.shutdown() try { From df2df2901a836213c36a9ac82c5374331f9980ba Mon Sep 17 00:00:00 2001 From: 21092004Goda Date: Mon, 8 Dec 2025 01:14:48 +0300 Subject: [PATCH 18/24] Merge remote-tracking branch 'origin/hw-9-io-kuro' into hw-9-io-kuro # Conflicts: # src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt --- .../ru/quipy/payments/logic/PaymentExternalServiceImpl.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt index 463b7cf88..cd9417bcb 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt @@ -36,8 +36,8 @@ class PaymentExternalSystemAdapterImpl( ) : PaymentExternalSystemAdapter, AutoCloseable { companion object { - private val logger = LoggerFactory.getLogger(PaymentExternalSystemAdapter::class.java) - private val mapper = ObjectMapper().registerKotlinModule() + val logger = LoggerFactory.getLogger(PaymentExternalSystemAdapter::class.java) + val mapper = ObjectMapper().registerKotlinModule() } private val sharedScheduler: ScheduledExecutorService = Executors.newScheduledThreadPool( From 2a3c72ea007a6e1ec648e47424b22730dbeaf57b Mon Sep 17 00:00:00 2001 From: 21092004Goda Date: Mon, 8 Dec 2025 01:18:45 +0300 Subject: [PATCH 19/24] Merge remote-tracking branch 'origin/hw-9-io-kuro' into hw-9-io-kuro # Conflicts: # src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt --- .../logic/PaymentExternalServiceImpl.kt | 326 ++++++++---------- 1 file changed, 138 insertions(+), 188 deletions(-) diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt index cd9417bcb..98898467c 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt @@ -3,7 +3,12 @@ package ru.quipy.payments.logic import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.registerKotlinModule import io.micrometer.core.instrument.MeterRegistry -import kotlinx.coroutines.* +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.CoroutineExceptionHandler +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.asCoroutineDispatcher +import kotlinx.coroutines.launch import kotlinx.coroutines.sync.Semaphore import org.slf4j.LoggerFactory import ru.quipy.common.utils.NamedThreadFactory @@ -20,12 +25,12 @@ import java.net.http.HttpResponse import java.time.Duration import java.util.* import java.util.concurrent.Executors -import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.TimeUnit -import java.util.concurrent.TimeoutException import kotlin.math.pow import kotlin.random.Random + +// Advice: always treat time as a Duration class PaymentExternalSystemAdapterImpl( private val properties: PaymentAccountProperties, private val paymentESService: EventSourcingService, @@ -33,29 +38,28 @@ class PaymentExternalSystemAdapterImpl( private val token: String, meterRegistry: MeterRegistry, private val parallelLimiter: Semaphore, -) : PaymentExternalSystemAdapter, AutoCloseable { - + private val ioDispatcher: CoroutineDispatcher = Executors.newFixedThreadPool( + (Runtime.getRuntime().availableProcessors() * 2.5).toInt(), + NamedThreadFactory("payment-io-") + ).asCoroutineDispatcher() +) : PaymentExternalSystemAdapter { companion object { val logger = LoggerFactory.getLogger(PaymentExternalSystemAdapter::class.java) val mapper = ObjectMapper().registerKotlinModule() } - private val sharedScheduler: ScheduledExecutorService = Executors.newScheduledThreadPool( - Runtime.getRuntime().availableProcessors() * 2, - NamedThreadFactory("payment-shared-${properties.accountName}-") - ) - - private val ioDispatcher: CoroutineDispatcher = sharedScheduler.asCoroutineDispatcher() - private val exceptionHandler = CoroutineExceptionHandler { _, throwable -> - logger.error("[${properties.accountName}] Unhandled exception in payment adapter coroutine", throwable) + logger.error("[$accountName] Unhandled exception in payment adapter coroutine", throwable) } private val scope = CoroutineScope(SupervisorJob() + ioDispatcher + exceptionHandler) - private val startedRequests = meterRegistry.counter("payment.processing.started", "accountName", properties.accountName) - private val timer = meterRegistry.timer("payment.external.system.request.latency", "accountName", properties.accountName) + // 2025-11-20T20:30:35.780+03:00 INFO 56644 --- [alhost:1234/...] ru.quipy.core.EventSourcingService : Optimistic lock exception. Failed to save event records id: [7dca693e-e811-4b7f-8bce-23e13d952c04-4] + private val startedRequests = + meterRegistry.counter("payment.processing.started", "accountName", properties.accountName) + private val timer = + meterRegistry.timer("payment.external.system.request.latency", "accountName", properties.accountName) private val serviceName = properties.serviceName private val accountName = properties.accountName private val requestAverageProcessingTime = properties.averageProcessingTime @@ -71,13 +75,20 @@ class PaymentExternalSystemAdapterImpl( private val httpClient = HttpClient .newBuilder() - .executor(sharedScheduler) + .executor(Executors.newFixedThreadPool(parallelRequests)) .version(HttpClient.Version.HTTP_2) .build() + private val retryScheduler = Executors.newScheduledThreadPool( + Runtime.getRuntime().availableProcessors() + ) + override fun performPaymentAsync(paymentId: UUID, amount: Int, paymentStartedAt: Long, deadline: Long) { + logger.warn("[$accountName] Submitting payment request for payment $paymentId") val transactionId = UUID.randomUUID() + // Вне зависимости от исхода оплаты важно отметить что она была отправлена. + // Это требуется сделать ВО ВСЕХ СЛУЧАЯХ, поскольку эта информация используется сервисом тестирования. scope.launch { try { paymentESService.update(paymentId) { @@ -100,192 +111,157 @@ class PaymentExternalSystemAdapterImpl( val minRequiredTime = requestAverageProcessingTime.toMillis() * 2 if (remaining < minRequiredTime) { logger.warn("[$accountName] Not enough time for payment $paymentId: ${remaining}ms remaining, need ${minRequiredTime}ms") - scope.launch { - paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, "not enough time") - } + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "not enough time") } - val retryAfterMs = minRequiredTime - remaining + Random.nextLong(100) throw TooManyRequestsException(retryAfterMs) } - var acquired = false - try { - if (!tryAcquire(now(), deadline - now())) { - val retryAfterMs = (requestAverageProcessingTime.toMillis() * 2).coerceAtLeast(100) + Random.nextLong(50) - throw TooManyRequestsException(retryAfterMs) - } - acquired = true + val timeBeforeCall = now() - if (!rateLimit.tickBlockingWithTimeout(deadline - now())) { - val retryAfterMs = (1100L / rateLimitPerSec).coerceAtLeast(100) + Random.nextLong(50) - throw TooManyRequestsException(retryAfterMs) - } + if (!tryAcquire(now(), deadline - now())) { + val retryAfterMs = (requestAverageProcessingTime.toMillis() / parallelRequests * 10).coerceIn( + 10, + 100 + ) + Random.nextLong(10) - val requestTimeout = minOf( - deadline - now(), - requestAverageProcessingTime.toMillis() * 2 - ).coerceAtLeast(100) + throw TooManyRequestsException(retryAfterMs) + } - if (requestTimeout < requestAverageProcessingTime.toMillis()) { - logger.warn("[$accountName] Timeout too short for payment $paymentId: ${requestTimeout}ms") - val retryAfterMs = requestAverageProcessingTime.toMillis() - requestTimeout + Random.nextLong(100) - throw TooManyRequestsException(retryAfterMs) - } + if (!rateLimit.tickBlockingWithTimeout(deadline - now())) { + parallelLimiter.release() - val request = HttpRequest.newBuilder() - .uri(URI("http://$paymentProviderHostPort/external/process?serviceName=$serviceName&token=$token&accountName=$accountName&transactionId=$transactionId&paymentId=$paymentId&amount=$amount")) - .timeout(Duration.ofMillis(requestTimeout)) - .POST(HttpRequest.BodyPublishers.noBody()) - .build() + val retryAfterMs = (1000L / rateLimitPerSec * 10).coerceIn(10, 100) + Random.nextLong(10) + throw TooManyRequestsException(retryAfterMs) + } - completeAction( - retryCount = 0, - request = request, - paymentId = paymentId, - transactionId = transactionId, - timeBeforeCall = now(), - deadline = deadline, - acquired = true - ) - - } catch (e: TooManyRequestsException) { - if (acquired) { - parallelLimiter.release() - acquired = false - } - throw e - } catch (e: Exception) { - if (acquired) { - parallelLimiter.release() - acquired = false - } - logger.error("[$accountName] Unexpected error in performPaymentAsync for $paymentId", e) - scope.launch { - paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, "unexpected error: ${e.message}") - } - } - throw e + val requestTimeout = minOf( + deadline - now(), + requestAverageProcessingTime.toMillis() * 2 + ).coerceAtLeast(100) + + if (requestTimeout < requestAverageProcessingTime.toMillis()) { + logger.warn("[$accountName] Timeout too short for payment $paymentId: ${requestTimeout}ms") + parallelLimiter.release() + val retryAfterMs = requestAverageProcessingTime.toMillis() - requestTimeout + Random.nextLong(100) + throw TooManyRequestsException(retryAfterMs) } + + val request = HttpRequest.newBuilder() + .uri(URI("http://$paymentProviderHostPort/external/process?serviceName=$serviceName&token=$token&accountName=$accountName&transactionId=$transactionId&paymentId=$paymentId&amount=$amount")) + .timeout(Duration.ofMillis(requestTimeout)) + .POST(HttpRequest.BodyPublishers.noBody()) + .build() + + val retryCount = 0L + + completeAction(retryCount, request, paymentId, transactionId, timeBeforeCall, deadline) } - private fun tryAcquire(startedAt: Long, remaining: Long): Boolean { + override fun price() = properties.price + + override fun isEnabled() = properties.enabled + + override fun name() = properties.accountName + + fun tryAcquire(startedAt: Long, remaining: Long): Boolean { var isAcquired = parallelLimiter.tryAcquire() while (!isAcquired && now() - startedAt < remaining) { - Thread.sleep(1) isAcquired = parallelLimiter.tryAcquire() + Thread.sleep(1) } + return isAcquired } - private fun completeAction( + fun completeAction( retryCount: Long, request: HttpRequest, paymentId: UUID, transactionId: UUID, timeBeforeCall: Long, - deadline: Long, - acquired: Boolean + deadline: Long ) { - val requestTimeout = request.timeout().orElse(Duration.ofSeconds(10)).toMillis() - val future = httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()) - .orTimeout(requestTimeout, TimeUnit.MILLISECONDS) + httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()) + .whenComplete { response, throwable -> + scope.launch { + try { + if (throwable != null) { + val e = throwable.cause + when (throwable.cause) { + is SocketTimeoutException -> { + logger.warn("[$accountName] attempt ${retryCount + 1} timeout: $paymentId", e) + } - future.whenComplete { response, throwable -> - try { - if (throwable != null) { - val cause = throwable.cause - val isTimeout = cause is SocketTimeoutException || throwable is TimeoutException - val isInterrupted = cause is InterruptedIOException - - val errorType = when { - isTimeout -> "timeout" - isInterrupted -> "interrupted" - else -> "io_error" - } - logger.warn("[$accountName] attempt ${retryCount + 1} $errorType: $paymentId", throwable) + is InterruptedIOException -> { + logger.warn("[$accountName] interrupted: $paymentId", e) + } - if (retryCount >= 2) { - scope.launch { - paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, "Max attempts reached") + else -> { + logger.warn("[$accountName] io error: $paymentId", e) + } } - } - } else { - val backoff = ((2.0.pow(retryCount.toDouble()) * 25).toLong() + Random.nextLong(10)) - val capped = backoff.coerceAtMost(deadline - now() - 10) - if (capped <= 0) { - scope.launch { + + if (retryCount + 1 >= 3) { paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, "Deadline expired") + it.logProcessing(false, now(), transactionId, "Max attempts reached") + } + } else { + val backoff = ((2.0.pow(retryCount.toDouble()) * 25).toLong() + Random.nextLong(10)) + val capped = backoff.coerceAtMost(deadline - now() - 5) + if (capped <= 0) { + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "Deadline expired") + } + } else { + scheduleRetry( + retryCount, + request, + paymentId, + transactionId, + timeBeforeCall, + deadline, + capped + ) + return@launch } } } else { - scheduleRetry( - retryCount = retryCount + 1, - request = request, - paymentId = paymentId, - transactionId = transactionId, - timeBeforeCall = timeBeforeCall, - deadline = deadline, - delay = capped, - acquired = acquired + logger.warn("Free space in semaphore: {}", parallelLimiter.availablePermits) + logger.info( + "success in callback for payment: {}, retry count: {}, in time: {}", + paymentId, + retryCount, + now() ) - return@whenComplete - } - } - } else { - logger.debug("[$accountName] Free space in semaphore: ${parallelLimiter.availablePermits}") - logger.info( - "[$accountName] success for payment: {}, retry count: {}, in time: {}", - paymentId, - retryCount, - now() - ) - val rawBody = response.body() - val parsed = try { - mapper.readValue(rawBody, ExternalSysResponse::class.java) - } catch (ex: Exception) { - ExternalSysResponse(transactionId.toString(), paymentId.toString(), false, ex.message) - } + val rawBody = response.body() + val parsed = try { + mapper.readValue(rawBody, ExternalSysResponse::class.java) + } catch (ex: Exception) { + ExternalSysResponse(transactionId.toString(), paymentId.toString(), false, ex.message) + } - scope.launch { - paymentESService.update(paymentId) { - it.logProcessing(parsed.result, now(), transactionId, parsed.message) + paymentESService.update(paymentId) { + it.logProcessing(parsed.result, now(), transactionId, parsed.message) + } + timer.record(now() - timeBeforeCall, TimeUnit.MILLISECONDS) } - } - - timer.record(now() - timeBeforeCall, TimeUnit.MILLISECONDS) - } - } catch (e: Exception) { - logger.error("[$accountName] Error in whenComplete for payment $paymentId", e) - scope.launch { - paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, "callback error: ${e.message}") + } catch (e: Exception) { + logger.error("[$accountName] Error processing payment $paymentId", e) + } finally { + startedRequests.increment() + parallelLimiter.release() } } - } finally { - startedRequests.increment() - if (acquired) { - parallelLimiter.release() - } } - } } private fun scheduleRetry( - retryCount: Long, - request: HttpRequest, - paymentId: UUID, - transactionId: UUID, - timeBeforeCall: Long, - deadline: Long, - delay: Long, - acquired: Boolean + retryCount: Long, request: HttpRequest, paymentId: UUID, + transactionId: UUID, timeBeforeCall: Long, deadline: Long, delay: Long ) { - sharedScheduler.schedule({ + retryScheduler.schedule({ val remainingTime = deadline - now() if (remainingTime < requestAverageProcessingTime.toMillis()) { scope.launch { @@ -297,48 +273,22 @@ class PaymentExternalSystemAdapterImpl( logger.error("[$accountName] Failed to record retry failure for $paymentId", e) } finally { startedRequests.increment() - if (acquired) parallelLimiter.release() + parallelLimiter.release() } } return@schedule } - val newRequestTimeout = minOf(remainingTime, requestAverageProcessingTime.toMillis() * 2).coerceAtLeast(100) + val newRequestTimeout = remainingTime.coerceIn(100, requestAverageProcessingTime.toMillis() * 2) val newRequest = HttpRequest.newBuilder() .uri(request.uri()) .timeout(Duration.ofMillis(newRequestTimeout)) .POST(HttpRequest.BodyPublishers.noBody()) .build() - completeAction( - retryCount = retryCount, - request = newRequest, - paymentId = paymentId, - transactionId = transactionId, - timeBeforeCall = timeBeforeCall, - deadline = deadline, - acquired = acquired - ) + completeAction(retryCount + 1, newRequest, paymentId, transactionId, timeBeforeCall, deadline) }, delay, TimeUnit.MILLISECONDS) } - - override fun price() = properties.price - - override fun isEnabled() = properties.enabled - - override fun name() = properties.accountName - - override fun close() { - sharedScheduler.shutdown() - try { - if (!sharedScheduler.awaitTermination(5, TimeUnit.SECONDS)) { - sharedScheduler.shutdownNow() - } - } catch (e: InterruptedException) { - sharedScheduler.shutdownNow() - Thread.currentThread().interrupt() - } - } } fun now() = System.currentTimeMillis() \ No newline at end of file From 824c90ecd165259a2c295c695586a9b15590ee94 Mon Sep 17 00:00:00 2001 From: 21092004Goda Date: Mon, 8 Dec 2025 01:27:12 +0300 Subject: [PATCH 20/24] Merge remote-tracking branch 'origin/hw-9-io-kuro' into hw-9-io-kuro # Conflicts: # src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt --- .../ru/quipy/payments/logic/PaymentExternalServiceImpl.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt index 98898467c..086cf25dc 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt @@ -39,7 +39,7 @@ class PaymentExternalSystemAdapterImpl( meterRegistry: MeterRegistry, private val parallelLimiter: Semaphore, private val ioDispatcher: CoroutineDispatcher = Executors.newFixedThreadPool( - (Runtime.getRuntime().availableProcessors() * 2.5).toInt(), + Runtime.getRuntime().availableProcessors() * 2, NamedThreadFactory("payment-io-") ).asCoroutineDispatcher() ) : PaymentExternalSystemAdapter { From 765e50f84e82ffb2534cc788065a50979ab3b9ce Mon Sep 17 00:00:00 2001 From: 21092004Goda Date: Mon, 8 Dec 2025 01:33:02 +0300 Subject: [PATCH 21/24] Merge remote-tracking branch 'origin/hw-9-io-kuro' into hw-9-io-kuro # Conflicts: # src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt --- .../logic/PaymentExternalServiceImpl.kt | 381 ++++++++++-------- 1 file changed, 213 insertions(+), 168 deletions(-) diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt index 086cf25dc..e92d13475 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt @@ -3,13 +3,10 @@ package ru.quipy.payments.logic import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.registerKotlinModule import io.micrometer.core.instrument.MeterRegistry -import kotlinx.coroutines.CoroutineDispatcher -import kotlinx.coroutines.CoroutineExceptionHandler -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.* import kotlinx.coroutines.asCoroutineDispatcher -import kotlinx.coroutines.launch import kotlinx.coroutines.sync.Semaphore +import kotlinx.coroutines.sync.withPermit import org.slf4j.LoggerFactory import ru.quipy.common.utils.NamedThreadFactory import ru.quipy.common.utils.SlidingWindowRateLimiter @@ -25,12 +22,12 @@ import java.net.http.HttpResponse import java.time.Duration import java.util.* import java.util.concurrent.Executors +import java.util.concurrent.ScheduledFuture import java.util.concurrent.TimeUnit import kotlin.math.pow import kotlin.random.Random +import kotlin.time.Duration.Companion.milliseconds - -// Advice: always treat time as a Duration class PaymentExternalSystemAdapterImpl( private val properties: PaymentAccountProperties, private val paymentESService: EventSourcingService, @@ -54,12 +51,17 @@ class PaymentExternalSystemAdapterImpl( private val scope = CoroutineScope(SupervisorJob() + ioDispatcher + exceptionHandler) - - // 2025-11-20T20:30:35.780+03:00 INFO 56644 --- [alhost:1234/...] ru.quipy.core.EventSourcingService : Optimistic lock exception. Failed to save event records id: [7dca693e-e811-4b7f-8bce-23e13d952c04-4] private val startedRequests = meterRegistry.counter("payment.processing.started", "accountName", properties.accountName) private val timer = meterRegistry.timer("payment.external.system.request.latency", "accountName", properties.accountName) + private val successCounter = + meterRegistry.counter("payment.processing.success", "accountName", properties.accountName) + private val failureCounter = + meterRegistry.counter("payment.processing.failure", "accountName", properties.accountName) + private val timeoutCounter = + meterRegistry.counter("payment.processing.timeout", "accountName", properties.accountName) + private val serviceName = properties.serviceName private val accountName = properties.accountName private val requestAverageProcessingTime = properties.averageProcessingTime @@ -77,27 +79,30 @@ class PaymentExternalSystemAdapterImpl( .newBuilder() .executor(Executors.newFixedThreadPool(parallelRequests)) .version(HttpClient.Version.HTTP_2) + .connectTimeout(Duration.ofSeconds(10)) .build() private val retryScheduler = Executors.newScheduledThreadPool( Runtime.getRuntime().availableProcessors() ) + private val pendingRequests = Collections.synchronizedMap(mutableMapOf>()) + override fun performPaymentAsync(paymentId: UUID, amount: Int, paymentStartedAt: Long, deadline: Long) { - logger.warn("[$accountName] Submitting payment request for payment $paymentId") + logger.info("[$accountName] Submitting payment request for payment $paymentId") val transactionId = UUID.randomUUID() - // Вне зависимости от исхода оплаты важно отметить что она была отправлена. - // Это требуется сделать ВО ВСЕХ СЛУЧАЯХ, поскольку эта информация используется сервисом тестирования. scope.launch { try { - paymentESService.update(paymentId) { - it.logSubmission( - success = true, - transactionId, - now(), - Duration.ofMillis(now() - paymentStartedAt) - ) + withTimeout(deadline - System.currentTimeMillis()) { + paymentESService.update(paymentId) { + it.logSubmission( + success = true, + transactionId, + now(), + Duration.ofMillis(now() - paymentStartedAt) + ) + } } logger.info("[$accountName] Log submission recorded for $paymentId") } catch (e: Exception) { @@ -105,189 +110,229 @@ class PaymentExternalSystemAdapterImpl( } } - logger.info("[$accountName] Submit: $paymentId , txId: $transactionId") - - val remaining = deadline - now() - val minRequiredTime = requestAverageProcessingTime.toMillis() * 2 - if (remaining < minRequiredTime) { - logger.warn("[$accountName] Not enough time for payment $paymentId: ${remaining}ms remaining, need ${minRequiredTime}ms") - paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, "not enough time") + scope.launch { + try { + executePaymentWithRetries(paymentId, transactionId, amount, deadline) + } catch (e: Exception) { + logger.error("[$accountName] Failed to process payment $paymentId", e) + handlePaymentFailure(paymentId, transactionId, e) } - val retryAfterMs = minRequiredTime - remaining + Random.nextLong(100) - throw TooManyRequestsException(retryAfterMs) } + } + + private suspend fun executePaymentWithRetries( + paymentId: UUID, + transactionId: UUID, + amount: Int, + deadline: Long + ) { + var retryCount = 0 + val maxRetries = 3 - val timeBeforeCall = now() + while (retryCount <= maxRetries) { + try { + checkDeadline(deadline, paymentId) + + val result = executePaymentAttempt(paymentId, transactionId, amount, deadline) + + if (result) { + successCounter.increment() + return + } else { + retryCount++ + if (retryCount <= maxRetries) { + val backoff = calculateBackoff(retryCount, deadline) + delay(backoff) + } + } + } catch (e: TooManyRequestsException) { + throw e + } catch (e: CancellationException) { + logger.warn("[$accountName] Payment $paymentId was cancelled") + throw e + } catch (e: Exception) { + logger.warn("[$accountName] Attempt ${retryCount + 1} failed for payment $paymentId", e) + retryCount++ - if (!tryAcquire(now(), deadline - now())) { - val retryAfterMs = (requestAverageProcessingTime.toMillis() / parallelRequests * 10).coerceIn( - 10, - 100 - ) + Random.nextLong(10) + if (retryCount > maxRetries) { + throw e + } - throw TooManyRequestsException(retryAfterMs) + val backoff = calculateBackoff(retryCount, deadline) + delay(backoff) + } } - if (!rateLimit.tickBlockingWithTimeout(deadline - now())) { - parallelLimiter.release() + throw RuntimeException("All retry attempts exhausted for payment $paymentId") + } - val retryAfterMs = (1000L / rateLimitPerSec * 10).coerceIn(10, 100) + Random.nextLong(10) - throw TooManyRequestsException(retryAfterMs) - } + private suspend fun executePaymentAttempt( + paymentId: UUID, + transactionId: UUID, + amount: Int, + deadline: Long + ): Boolean = withTimeoutOrNull(deadline - System.currentTimeMillis()) { + try { + val semaphoreAcquired = withTimeoutOrNull(deadline - System.currentTimeMillis()) { + parallelLimiter.tryAcquire() + if (!parallelLimiter.tryAcquire()) { + val waitTime = minOf(deadline - System.currentTimeMillis(), 100L) + if (waitTime > 0) { + delay(waitTime) + } + parallelLimiter.tryAcquire() + } else { + true + } + } ?: false - val requestTimeout = minOf( - deadline - now(), - requestAverageProcessingTime.toMillis() * 2 - ).coerceAtLeast(100) + if (!semaphoreAcquired) { + val retryAfter = (requestAverageProcessingTime.toMillis() / parallelRequests).coerceIn(10, 100) + throw TooManyRequestsException(retryAfter + Random.nextLong(10)) + } + + val rateLimitAcquired = withTimeoutOrNull(deadline - System.currentTimeMillis()) { + rateLimit.tickBlockingWithTimeout(deadline - System.currentTimeMillis()) + } ?: false + + if (!rateLimitAcquired) { + parallelLimiter.release() + val retryAfter = (1000L / rateLimitPerSec).coerceIn(10, 100) + throw TooManyRequestsException(retryAfter + Random.nextLong(10)) + } + + val requestTimeout = calculateRequestTimeout(deadline) - if (requestTimeout < requestAverageProcessingTime.toMillis()) { - logger.warn("[$accountName] Timeout too short for payment $paymentId: ${requestTimeout}ms") - parallelLimiter.release() - val retryAfterMs = requestAverageProcessingTime.toMillis() - requestTimeout + Random.nextLong(100) - throw TooManyRequestsException(retryAfterMs) + val response = executeHttpRequest(paymentId, transactionId, amount, requestTimeout) + + val processingResult = processHttpResponse(response, paymentId, transactionId) + + if (processingResult) { + paymentESService.update(paymentId) { + it.logProcessing(true, now(), transactionId, "Success") + } + } + + return@withTimeoutOrNull processingResult + + } finally { + if (parallelLimiter.availablePermits < parallelRequests) { + parallelLimiter.release() + } + startedRequests.increment() } + } ?: run { + timeoutCounter.increment() + logger.warn("[$accountName] Payment $paymentId timed out") + false + } + private suspend fun executeHttpRequest( + paymentId: UUID, + transactionId: UUID, + amount: Int, + requestTimeout: Long + ): HttpResponse = withContext(Dispatchers.IO) { val request = HttpRequest.newBuilder() .uri(URI("http://$paymentProviderHostPort/external/process?serviceName=$serviceName&token=$token&accountName=$accountName&transactionId=$transactionId&paymentId=$paymentId&amount=$amount")) .timeout(Duration.ofMillis(requestTimeout)) .POST(HttpRequest.BodyPublishers.noBody()) .build() - val retryCount = 0L - - completeAction(retryCount, request, paymentId, transactionId, timeBeforeCall, deadline) + try { + val timeBeforeCall = now() + val response = httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()).get() + timer.record(now() - timeBeforeCall, TimeUnit.MILLISECONDS) + response + } catch (e: Exception) { + logger.error("[$accountName] HTTP request failed for $paymentId", e) + throw e + } } - override fun price() = properties.price + private fun processHttpResponse( + response: HttpResponse, + paymentId: UUID, + transactionId: UUID + ): Boolean { + return try { + val rawBody = response.body() + val parsed = mapper.readValue(rawBody, ExternalSysResponse::class.java) + + if (!parsed.result) { + logger.warn("[$accountName] External system returned failure for $paymentId: ${parsed.message}") + failureCounter.increment() + } - override fun isEnabled() = properties.enabled + parsed.result + } catch (ex: Exception) { + logger.error("[$accountName] Failed to parse response for $paymentId", ex) + failureCounter.increment() + false + } + } - override fun name() = properties.accountName + private fun checkDeadline(deadline: Long, paymentId: UUID) { + val remaining = deadline - System.currentTimeMillis() + if (remaining <= 0) { + throw CancellationException("Deadline expired for payment $paymentId") + } - fun tryAcquire(startedAt: Long, remaining: Long): Boolean { - var isAcquired = parallelLimiter.tryAcquire() - while (!isAcquired && now() - startedAt < remaining) { - isAcquired = parallelLimiter.tryAcquire() - Thread.sleep(1) + val minRequiredTime = requestAverageProcessingTime.toMillis() * 2 + if (remaining < minRequiredTime) { + logger.warn("[$accountName] Not enough time for payment $paymentId: ${remaining}ms remaining, need ${minRequiredTime}ms") + throw TooManyRequestsException(minRequiredTime - remaining + Random.nextLong(100)) } + } - return isAcquired + private fun calculateRequestTimeout(deadline: Long): Long { + val remaining = deadline - System.currentTimeMillis() + return minOf( + remaining * 2 / 3, + requestAverageProcessingTime.toMillis() * 3 + ).coerceIn(100, 30000L) } - fun completeAction( - retryCount: Long, - request: HttpRequest, - paymentId: UUID, - transactionId: UUID, - timeBeforeCall: Long, - deadline: Long - ) { - httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()) - .whenComplete { response, throwable -> - scope.launch { - try { - if (throwable != null) { - val e = throwable.cause - when (throwable.cause) { - is SocketTimeoutException -> { - logger.warn("[$accountName] attempt ${retryCount + 1} timeout: $paymentId", e) - } - - is InterruptedIOException -> { - logger.warn("[$accountName] interrupted: $paymentId", e) - } - - else -> { - logger.warn("[$accountName] io error: $paymentId", e) - } - } - - if (retryCount + 1 >= 3) { - paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, "Max attempts reached") - } - } else { - val backoff = ((2.0.pow(retryCount.toDouble()) * 25).toLong() + Random.nextLong(10)) - val capped = backoff.coerceAtMost(deadline - now() - 5) - if (capped <= 0) { - paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, "Deadline expired") - } - } else { - scheduleRetry( - retryCount, - request, - paymentId, - transactionId, - timeBeforeCall, - deadline, - capped - ) - return@launch - } - } - } else { - logger.warn("Free space in semaphore: {}", parallelLimiter.availablePermits) - logger.info( - "success in callback for payment: {}, retry count: {}, in time: {}", - paymentId, - retryCount, - now() - ) - val rawBody = response.body() - val parsed = try { - mapper.readValue(rawBody, ExternalSysResponse::class.java) - } catch (ex: Exception) { - ExternalSysResponse(transactionId.toString(), paymentId.toString(), false, ex.message) - } - - paymentESService.update(paymentId) { - it.logProcessing(parsed.result, now(), transactionId, parsed.message) - } - timer.record(now() - timeBeforeCall, TimeUnit.MILLISECONDS) - } - } catch (e: Exception) { - logger.error("[$accountName] Error processing payment $paymentId", e) - } finally { - startedRequests.increment() - parallelLimiter.release() - } - } - } + private fun calculateBackoff(retryCount: Int, deadline: Long): Long { + val baseDelay = (2.0.pow(retryCount.toDouble()) * 25).toLong() + val jitter = Random.nextLong(10) + val totalDelay = baseDelay + jitter + + val remaining = deadline - System.currentTimeMillis() + val safeDelay = minOf(totalDelay, remaining / 2) + + return safeDelay.coerceAtLeast(0) } - private fun scheduleRetry( - retryCount: Long, request: HttpRequest, paymentId: UUID, - transactionId: UUID, timeBeforeCall: Long, deadline: Long, delay: Long - ) { - retryScheduler.schedule({ - val remainingTime = deadline - now() - if (remainingTime < requestAverageProcessingTime.toMillis()) { - scope.launch { - try { - paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, "Not enough time for retry") - } - } catch (e: Exception) { - logger.error("[$accountName] Failed to record retry failure for $paymentId", e) - } finally { - startedRequests.increment() - parallelLimiter.release() + private fun handlePaymentFailure(paymentId: UUID, transactionId: UUID, cause: Exception) { + scope.launch { + try { + paymentESService.update(paymentId) { + val message = when (cause) { + is TooManyRequestsException -> "Rate limited" + is TimeoutCancellationException -> "Timeout" + is CancellationException -> "Cancelled" + else -> cause.message ?: "Unknown error" } + it.logProcessing(false, now(), transactionId, message) } - return@schedule + } catch (e: Exception) { + logger.error("[$accountName] Failed to record failure for payment $paymentId", e) } + } + } - val newRequestTimeout = remainingTime.coerceIn(100, requestAverageProcessingTime.toMillis() * 2) - val newRequest = HttpRequest.newBuilder() - .uri(request.uri()) - .timeout(Duration.ofMillis(newRequestTimeout)) - .POST(HttpRequest.BodyPublishers.noBody()) - .build() + override fun price() = properties.price + + override fun isEnabled() = properties.enabled + + override fun name() = properties.accountName - completeAction(retryCount + 1, newRequest, paymentId, transactionId, timeBeforeCall, deadline) - }, delay, TimeUnit.MILLISECONDS) + fun shutdown() { + logger.info("[$accountName] Shutting down payment adapter") + scope.cancel() + retryScheduler.shutdown() + (httpClient.executor().orElse(null) as? java.util.concurrent.ExecutorService)?.shutdown() } } From 3a925b2683dbed49606be4ce62c9c0ee7ca571ef Mon Sep 17 00:00:00 2001 From: 21092004Goda Date: Mon, 8 Dec 2025 01:36:52 +0300 Subject: [PATCH 22/24] Merge remote-tracking branch 'origin/hw-9-io-kuro' into hw-9-io-kuro # Conflicts: # src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt --- .../logic/PaymentExternalServiceImpl.kt | 381 ++++++++---------- 1 file changed, 168 insertions(+), 213 deletions(-) diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt index e92d13475..086cf25dc 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt @@ -3,10 +3,13 @@ package ru.quipy.payments.logic import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.registerKotlinModule import io.micrometer.core.instrument.MeterRegistry -import kotlinx.coroutines.* +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.CoroutineExceptionHandler +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.asCoroutineDispatcher +import kotlinx.coroutines.launch import kotlinx.coroutines.sync.Semaphore -import kotlinx.coroutines.sync.withPermit import org.slf4j.LoggerFactory import ru.quipy.common.utils.NamedThreadFactory import ru.quipy.common.utils.SlidingWindowRateLimiter @@ -22,12 +25,12 @@ import java.net.http.HttpResponse import java.time.Duration import java.util.* import java.util.concurrent.Executors -import java.util.concurrent.ScheduledFuture import java.util.concurrent.TimeUnit import kotlin.math.pow import kotlin.random.Random -import kotlin.time.Duration.Companion.milliseconds + +// Advice: always treat time as a Duration class PaymentExternalSystemAdapterImpl( private val properties: PaymentAccountProperties, private val paymentESService: EventSourcingService, @@ -51,17 +54,12 @@ class PaymentExternalSystemAdapterImpl( private val scope = CoroutineScope(SupervisorJob() + ioDispatcher + exceptionHandler) + + // 2025-11-20T20:30:35.780+03:00 INFO 56644 --- [alhost:1234/...] ru.quipy.core.EventSourcingService : Optimistic lock exception. Failed to save event records id: [7dca693e-e811-4b7f-8bce-23e13d952c04-4] private val startedRequests = meterRegistry.counter("payment.processing.started", "accountName", properties.accountName) private val timer = meterRegistry.timer("payment.external.system.request.latency", "accountName", properties.accountName) - private val successCounter = - meterRegistry.counter("payment.processing.success", "accountName", properties.accountName) - private val failureCounter = - meterRegistry.counter("payment.processing.failure", "accountName", properties.accountName) - private val timeoutCounter = - meterRegistry.counter("payment.processing.timeout", "accountName", properties.accountName) - private val serviceName = properties.serviceName private val accountName = properties.accountName private val requestAverageProcessingTime = properties.averageProcessingTime @@ -79,30 +77,27 @@ class PaymentExternalSystemAdapterImpl( .newBuilder() .executor(Executors.newFixedThreadPool(parallelRequests)) .version(HttpClient.Version.HTTP_2) - .connectTimeout(Duration.ofSeconds(10)) .build() private val retryScheduler = Executors.newScheduledThreadPool( Runtime.getRuntime().availableProcessors() ) - private val pendingRequests = Collections.synchronizedMap(mutableMapOf>()) - override fun performPaymentAsync(paymentId: UUID, amount: Int, paymentStartedAt: Long, deadline: Long) { - logger.info("[$accountName] Submitting payment request for payment $paymentId") + logger.warn("[$accountName] Submitting payment request for payment $paymentId") val transactionId = UUID.randomUUID() + // Вне зависимости от исхода оплаты важно отметить что она была отправлена. + // Это требуется сделать ВО ВСЕХ СЛУЧАЯХ, поскольку эта информация используется сервисом тестирования. scope.launch { try { - withTimeout(deadline - System.currentTimeMillis()) { - paymentESService.update(paymentId) { - it.logSubmission( - success = true, - transactionId, - now(), - Duration.ofMillis(now() - paymentStartedAt) - ) - } + paymentESService.update(paymentId) { + it.logSubmission( + success = true, + transactionId, + now(), + Duration.ofMillis(now() - paymentStartedAt) + ) } logger.info("[$accountName] Log submission recorded for $paymentId") } catch (e: Exception) { @@ -110,229 +105,189 @@ class PaymentExternalSystemAdapterImpl( } } - scope.launch { - try { - executePaymentWithRetries(paymentId, transactionId, amount, deadline) - } catch (e: Exception) { - logger.error("[$accountName] Failed to process payment $paymentId", e) - handlePaymentFailure(paymentId, transactionId, e) - } - } - } - - private suspend fun executePaymentWithRetries( - paymentId: UUID, - transactionId: UUID, - amount: Int, - deadline: Long - ) { - var retryCount = 0 - val maxRetries = 3 - - while (retryCount <= maxRetries) { - try { - checkDeadline(deadline, paymentId) - - val result = executePaymentAttempt(paymentId, transactionId, amount, deadline) - - if (result) { - successCounter.increment() - return - } else { - retryCount++ - if (retryCount <= maxRetries) { - val backoff = calculateBackoff(retryCount, deadline) - delay(backoff) - } - } - } catch (e: TooManyRequestsException) { - throw e - } catch (e: CancellationException) { - logger.warn("[$accountName] Payment $paymentId was cancelled") - throw e - } catch (e: Exception) { - logger.warn("[$accountName] Attempt ${retryCount + 1} failed for payment $paymentId", e) - retryCount++ - - if (retryCount > maxRetries) { - throw e - } + logger.info("[$accountName] Submit: $paymentId , txId: $transactionId") - val backoff = calculateBackoff(retryCount, deadline) - delay(backoff) + val remaining = deadline - now() + val minRequiredTime = requestAverageProcessingTime.toMillis() * 2 + if (remaining < minRequiredTime) { + logger.warn("[$accountName] Not enough time for payment $paymentId: ${remaining}ms remaining, need ${minRequiredTime}ms") + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "not enough time") } + val retryAfterMs = minRequiredTime - remaining + Random.nextLong(100) + throw TooManyRequestsException(retryAfterMs) } - throw RuntimeException("All retry attempts exhausted for payment $paymentId") - } + val timeBeforeCall = now() - private suspend fun executePaymentAttempt( - paymentId: UUID, - transactionId: UUID, - amount: Int, - deadline: Long - ): Boolean = withTimeoutOrNull(deadline - System.currentTimeMillis()) { - try { - val semaphoreAcquired = withTimeoutOrNull(deadline - System.currentTimeMillis()) { - parallelLimiter.tryAcquire() - if (!parallelLimiter.tryAcquire()) { - val waitTime = minOf(deadline - System.currentTimeMillis(), 100L) - if (waitTime > 0) { - delay(waitTime) - } - parallelLimiter.tryAcquire() - } else { - true - } - } ?: false - - if (!semaphoreAcquired) { - val retryAfter = (requestAverageProcessingTime.toMillis() / parallelRequests).coerceIn(10, 100) - throw TooManyRequestsException(retryAfter + Random.nextLong(10)) - } - - val rateLimitAcquired = withTimeoutOrNull(deadline - System.currentTimeMillis()) { - rateLimit.tickBlockingWithTimeout(deadline - System.currentTimeMillis()) - } ?: false - - if (!rateLimitAcquired) { - parallelLimiter.release() - val retryAfter = (1000L / rateLimitPerSec).coerceIn(10, 100) - throw TooManyRequestsException(retryAfter + Random.nextLong(10)) - } - - val requestTimeout = calculateRequestTimeout(deadline) + if (!tryAcquire(now(), deadline - now())) { + val retryAfterMs = (requestAverageProcessingTime.toMillis() / parallelRequests * 10).coerceIn( + 10, + 100 + ) + Random.nextLong(10) - val response = executeHttpRequest(paymentId, transactionId, amount, requestTimeout) + throw TooManyRequestsException(retryAfterMs) + } - val processingResult = processHttpResponse(response, paymentId, transactionId) + if (!rateLimit.tickBlockingWithTimeout(deadline - now())) { + parallelLimiter.release() - if (processingResult) { - paymentESService.update(paymentId) { - it.logProcessing(true, now(), transactionId, "Success") - } - } + val retryAfterMs = (1000L / rateLimitPerSec * 10).coerceIn(10, 100) + Random.nextLong(10) + throw TooManyRequestsException(retryAfterMs) + } - return@withTimeoutOrNull processingResult + val requestTimeout = minOf( + deadline - now(), + requestAverageProcessingTime.toMillis() * 2 + ).coerceAtLeast(100) - } finally { - if (parallelLimiter.availablePermits < parallelRequests) { - parallelLimiter.release() - } - startedRequests.increment() + if (requestTimeout < requestAverageProcessingTime.toMillis()) { + logger.warn("[$accountName] Timeout too short for payment $paymentId: ${requestTimeout}ms") + parallelLimiter.release() + val retryAfterMs = requestAverageProcessingTime.toMillis() - requestTimeout + Random.nextLong(100) + throw TooManyRequestsException(retryAfterMs) } - } ?: run { - timeoutCounter.increment() - logger.warn("[$accountName] Payment $paymentId timed out") - false - } - private suspend fun executeHttpRequest( - paymentId: UUID, - transactionId: UUID, - amount: Int, - requestTimeout: Long - ): HttpResponse = withContext(Dispatchers.IO) { val request = HttpRequest.newBuilder() .uri(URI("http://$paymentProviderHostPort/external/process?serviceName=$serviceName&token=$token&accountName=$accountName&transactionId=$transactionId&paymentId=$paymentId&amount=$amount")) .timeout(Duration.ofMillis(requestTimeout)) .POST(HttpRequest.BodyPublishers.noBody()) .build() - try { - val timeBeforeCall = now() - val response = httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()).get() - timer.record(now() - timeBeforeCall, TimeUnit.MILLISECONDS) - response - } catch (e: Exception) { - logger.error("[$accountName] HTTP request failed for $paymentId", e) - throw e - } - } - - private fun processHttpResponse( - response: HttpResponse, - paymentId: UUID, - transactionId: UUID - ): Boolean { - return try { - val rawBody = response.body() - val parsed = mapper.readValue(rawBody, ExternalSysResponse::class.java) - - if (!parsed.result) { - logger.warn("[$accountName] External system returned failure for $paymentId: ${parsed.message}") - failureCounter.increment() - } + val retryCount = 0L - parsed.result - } catch (ex: Exception) { - logger.error("[$accountName] Failed to parse response for $paymentId", ex) - failureCounter.increment() - false - } + completeAction(retryCount, request, paymentId, transactionId, timeBeforeCall, deadline) } - private fun checkDeadline(deadline: Long, paymentId: UUID) { - val remaining = deadline - System.currentTimeMillis() - if (remaining <= 0) { - throw CancellationException("Deadline expired for payment $paymentId") - } - - val minRequiredTime = requestAverageProcessingTime.toMillis() * 2 - if (remaining < minRequiredTime) { - logger.warn("[$accountName] Not enough time for payment $paymentId: ${remaining}ms remaining, need ${minRequiredTime}ms") - throw TooManyRequestsException(minRequiredTime - remaining + Random.nextLong(100)) - } - } + override fun price() = properties.price - private fun calculateRequestTimeout(deadline: Long): Long { - val remaining = deadline - System.currentTimeMillis() - return minOf( - remaining * 2 / 3, - requestAverageProcessingTime.toMillis() * 3 - ).coerceIn(100, 30000L) - } + override fun isEnabled() = properties.enabled - private fun calculateBackoff(retryCount: Int, deadline: Long): Long { - val baseDelay = (2.0.pow(retryCount.toDouble()) * 25).toLong() - val jitter = Random.nextLong(10) - val totalDelay = baseDelay + jitter + override fun name() = properties.accountName - val remaining = deadline - System.currentTimeMillis() - val safeDelay = minOf(totalDelay, remaining / 2) + fun tryAcquire(startedAt: Long, remaining: Long): Boolean { + var isAcquired = parallelLimiter.tryAcquire() + while (!isAcquired && now() - startedAt < remaining) { + isAcquired = parallelLimiter.tryAcquire() + Thread.sleep(1) + } - return safeDelay.coerceAtLeast(0) + return isAcquired } - private fun handlePaymentFailure(paymentId: UUID, transactionId: UUID, cause: Exception) { - scope.launch { - try { - paymentESService.update(paymentId) { - val message = when (cause) { - is TooManyRequestsException -> "Rate limited" - is TimeoutCancellationException -> "Timeout" - is CancellationException -> "Cancelled" - else -> cause.message ?: "Unknown error" + fun completeAction( + retryCount: Long, + request: HttpRequest, + paymentId: UUID, + transactionId: UUID, + timeBeforeCall: Long, + deadline: Long + ) { + httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()) + .whenComplete { response, throwable -> + scope.launch { + try { + if (throwable != null) { + val e = throwable.cause + when (throwable.cause) { + is SocketTimeoutException -> { + logger.warn("[$accountName] attempt ${retryCount + 1} timeout: $paymentId", e) + } + + is InterruptedIOException -> { + logger.warn("[$accountName] interrupted: $paymentId", e) + } + + else -> { + logger.warn("[$accountName] io error: $paymentId", e) + } + } + + if (retryCount + 1 >= 3) { + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "Max attempts reached") + } + } else { + val backoff = ((2.0.pow(retryCount.toDouble()) * 25).toLong() + Random.nextLong(10)) + val capped = backoff.coerceAtMost(deadline - now() - 5) + if (capped <= 0) { + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "Deadline expired") + } + } else { + scheduleRetry( + retryCount, + request, + paymentId, + transactionId, + timeBeforeCall, + deadline, + capped + ) + return@launch + } + } + } else { + logger.warn("Free space in semaphore: {}", parallelLimiter.availablePermits) + logger.info( + "success in callback for payment: {}, retry count: {}, in time: {}", + paymentId, + retryCount, + now() + ) + val rawBody = response.body() + val parsed = try { + mapper.readValue(rawBody, ExternalSysResponse::class.java) + } catch (ex: Exception) { + ExternalSysResponse(transactionId.toString(), paymentId.toString(), false, ex.message) + } + + paymentESService.update(paymentId) { + it.logProcessing(parsed.result, now(), transactionId, parsed.message) + } + timer.record(now() - timeBeforeCall, TimeUnit.MILLISECONDS) + } + } catch (e: Exception) { + logger.error("[$accountName] Error processing payment $paymentId", e) + } finally { + startedRequests.increment() + parallelLimiter.release() } - it.logProcessing(false, now(), transactionId, message) } - } catch (e: Exception) { - logger.error("[$accountName] Failed to record failure for payment $paymentId", e) } - } } - override fun price() = properties.price - - override fun isEnabled() = properties.enabled + private fun scheduleRetry( + retryCount: Long, request: HttpRequest, paymentId: UUID, + transactionId: UUID, timeBeforeCall: Long, deadline: Long, delay: Long + ) { + retryScheduler.schedule({ + val remainingTime = deadline - now() + if (remainingTime < requestAverageProcessingTime.toMillis()) { + scope.launch { + try { + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "Not enough time for retry") + } + } catch (e: Exception) { + logger.error("[$accountName] Failed to record retry failure for $paymentId", e) + } finally { + startedRequests.increment() + parallelLimiter.release() + } + } + return@schedule + } - override fun name() = properties.accountName + val newRequestTimeout = remainingTime.coerceIn(100, requestAverageProcessingTime.toMillis() * 2) + val newRequest = HttpRequest.newBuilder() + .uri(request.uri()) + .timeout(Duration.ofMillis(newRequestTimeout)) + .POST(HttpRequest.BodyPublishers.noBody()) + .build() - fun shutdown() { - logger.info("[$accountName] Shutting down payment adapter") - scope.cancel() - retryScheduler.shutdown() - (httpClient.executor().orElse(null) as? java.util.concurrent.ExecutorService)?.shutdown() + completeAction(retryCount + 1, newRequest, paymentId, transactionId, timeBeforeCall, deadline) + }, delay, TimeUnit.MILLISECONDS) } } From c5041fdb9267e94760065eedcd76a7fc70610a5c Mon Sep 17 00:00:00 2001 From: 21092004Goda Date: Mon, 8 Dec 2025 01:46:50 +0300 Subject: [PATCH 23/24] Merge remote-tracking branch 'origin/hw-9-io-kuro' into hw-9-io-kuro # Conflicts: # src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt --- .../logic/PaymentExternalServiceImpl.kt | 256 ++++++++---------- 1 file changed, 118 insertions(+), 138 deletions(-) diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt index 086cf25dc..79694a7cc 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt @@ -3,12 +3,7 @@ package ru.quipy.payments.logic import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.registerKotlinModule import io.micrometer.core.instrument.MeterRegistry -import kotlinx.coroutines.CoroutineDispatcher -import kotlinx.coroutines.CoroutineExceptionHandler -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.SupervisorJob -import kotlinx.coroutines.asCoroutineDispatcher -import kotlinx.coroutines.launch +import kotlinx.coroutines.* import kotlinx.coroutines.sync.Semaphore import org.slf4j.LoggerFactory import ru.quipy.common.utils.NamedThreadFactory @@ -26,11 +21,10 @@ import java.time.Duration import java.util.* import java.util.concurrent.Executors import java.util.concurrent.TimeUnit -import kotlin.math.pow +import kotlin.math.min import kotlin.random.Random -// Advice: always treat time as a Duration class PaymentExternalSystemAdapterImpl( private val properties: PaymentAccountProperties, private val paymentESService: EventSourcingService, @@ -39,43 +33,44 @@ class PaymentExternalSystemAdapterImpl( meterRegistry: MeterRegistry, private val parallelLimiter: Semaphore, private val ioDispatcher: CoroutineDispatcher = Executors.newFixedThreadPool( - Runtime.getRuntime().availableProcessors() * 2, + Runtime.getRuntime().availableProcessors() * 4, NamedThreadFactory("payment-io-") ).asCoroutineDispatcher() ) : PaymentExternalSystemAdapter { + companion object { val logger = LoggerFactory.getLogger(PaymentExternalSystemAdapter::class.java) val mapper = ObjectMapper().registerKotlinModule() } private val exceptionHandler = CoroutineExceptionHandler { _, throwable -> - logger.error("[$accountName] Unhandled exception in payment adapter coroutine", throwable) + logger.error("[${properties.accountName}] Unhandled exception", throwable) } private val scope = CoroutineScope(SupervisorJob() + ioDispatcher + exceptionHandler) - - // 2025-11-20T20:30:35.780+03:00 INFO 56644 --- [alhost:1234/...] ru.quipy.core.EventSourcingService : Optimistic lock exception. Failed to save event records id: [7dca693e-e811-4b7f-8bce-23e13d952c04-4] private val startedRequests = meterRegistry.counter("payment.processing.started", "accountName", properties.accountName) private val timer = meterRegistry.timer("payment.external.system.request.latency", "accountName", properties.accountName) + private val serviceName = properties.serviceName private val accountName = properties.accountName - private val requestAverageProcessingTime = properties.averageProcessingTime + private val requestAvg = properties.averageProcessingTime private val rateLimitPerSec = properties.rateLimitPerSec private val parallelRequests = properties.parallelRequests + private val rateLimit: SlidingWindowRateLimiter by lazy { SlidingWindowRateLimiter( - rate = (rateLimitPerSec * 0.9).toLong(), - window = Duration.ofMillis(1000), + rate = rateLimitPerSec.toLong(), + window = Duration.ofSeconds(1) ) } private val httpClient = HttpClient .newBuilder() - .executor(Executors.newFixedThreadPool(parallelRequests)) + .executor(Executors.newFixedThreadPool(parallelRequests * 2)) .version(HttpClient.Version.HTTP_2) .build() @@ -84,98 +79,77 @@ class PaymentExternalSystemAdapterImpl( ) override fun performPaymentAsync(paymentId: UUID, amount: Int, paymentStartedAt: Long, deadline: Long) { - logger.warn("[$accountName] Submitting payment request for payment $paymentId") val transactionId = UUID.randomUUID() - // Вне зависимости от исхода оплаты важно отметить что она была отправлена. - // Это требуется сделать ВО ВСЕХ СЛУЧАЯХ, поскольку эта информация используется сервисом тестирования. scope.launch { try { paymentESService.update(paymentId) { - it.logSubmission( - success = true, - transactionId, - now(), - Duration.ofMillis(now() - paymentStartedAt) - ) + it.logSubmission(true, transactionId, now(), Duration.ofMillis(now() - paymentStartedAt)) } - logger.info("[$accountName] Log submission recorded for $paymentId") - } catch (e: Exception) { - logger.error("[$accountName] Failed to record log submission for $paymentId", e) - } + } catch (_: Exception) {} } - logger.info("[$accountName] Submit: $paymentId , txId: $transactionId") + val startRemaining = deadline - now() + val minRequired = requestAvg.toMillis() * 2 - val remaining = deadline - now() - val minRequiredTime = requestAverageProcessingTime.toMillis() * 2 - if (remaining < minRequiredTime) { - logger.warn("[$accountName] Not enough time for payment $paymentId: ${remaining}ms remaining, need ${minRequiredTime}ms") - paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, "not enough time") - } - val retryAfterMs = minRequiredTime - remaining + Random.nextLong(100) - throw TooManyRequestsException(retryAfterMs) + if (startRemaining < minRequired) { + val retryAfter = minRequired - startRemaining + Random.nextLong(20) + throw TooManyRequestsException(retryAfter) } - val timeBeforeCall = now() - - if (!tryAcquire(now(), deadline - now())) { - val retryAfterMs = (requestAverageProcessingTime.toMillis() / parallelRequests * 10).coerceIn( - 10, - 100 - ) + Random.nextLong(10) - - throw TooManyRequestsException(retryAfterMs) + if (!tryAcquire(deadline)) { + val retryAfter = (requestAvg.toMillis() / parallelRequests * 5) + .coerceIn(20, 120) + Random.nextLong(20) + throw TooManyRequestsException(retryAfter) } if (!rateLimit.tickBlockingWithTimeout(deadline - now())) { parallelLimiter.release() - - val retryAfterMs = (1000L / rateLimitPerSec * 10).coerceIn(10, 100) + Random.nextLong(10) - throw TooManyRequestsException(retryAfterMs) + val retryAfter = (1000L / rateLimitPerSec * 5).coerceIn(20, 100) + Random.nextLong(20) + throw TooManyRequestsException(retryAfter) } - val requestTimeout = minOf( + val reqTimeout = min( deadline - now(), - requestAverageProcessingTime.toMillis() * 2 - ).coerceAtLeast(100) - - if (requestTimeout < requestAverageProcessingTime.toMillis()) { - logger.warn("[$accountName] Timeout too short for payment $paymentId: ${requestTimeout}ms") - parallelLimiter.release() - val retryAfterMs = requestAverageProcessingTime.toMillis() - requestTimeout + Random.nextLong(100) - throw TooManyRequestsException(retryAfterMs) - } + requestAvg.toMillis() * 2 + ).coerceAtLeast(150) val request = HttpRequest.newBuilder() - .uri(URI("http://$paymentProviderHostPort/external/process?serviceName=$serviceName&token=$token&accountName=$accountName&transactionId=$transactionId&paymentId=$paymentId&amount=$amount")) - .timeout(Duration.ofMillis(requestTimeout)) + .uri( + URI( + "http://$paymentProviderHostPort/external/process" + + "?serviceName=$serviceName&token=$token" + + "&accountName=$accountName&transactionId=$transactionId" + + "&paymentId=$paymentId&amount=$amount" + ) + ) + .timeout(Duration.ofMillis(reqTimeout)) .POST(HttpRequest.BodyPublishers.noBody()) .build() - val retryCount = 0L - - completeAction(retryCount, request, paymentId, transactionId, timeBeforeCall, deadline) + completeAction( + retryCount = 0, + request = request, + paymentId = paymentId, + transactionId = transactionId, + timeBeforeCall = now(), + deadline = deadline + ) } override fun price() = properties.price - override fun isEnabled() = properties.enabled + override fun name() = accountName - override fun name() = properties.accountName - - fun tryAcquire(startedAt: Long, remaining: Long): Boolean { - var isAcquired = parallelLimiter.tryAcquire() - while (!isAcquired && now() - startedAt < remaining) { - isAcquired = parallelLimiter.tryAcquire() + private fun tryAcquire(deadline: Long): Boolean { + while (now() < deadline) { + if (parallelLimiter.tryAcquire()) return true Thread.sleep(1) } - - return isAcquired + return false } - fun completeAction( + private fun completeAction( retryCount: Long, request: HttpRequest, paymentId: UUID, @@ -185,70 +159,61 @@ class PaymentExternalSystemAdapterImpl( ) { httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()) .whenComplete { response, throwable -> + scope.launch { try { if (throwable != null) { - val e = throwable.cause - when (throwable.cause) { - is SocketTimeoutException -> { - logger.warn("[$accountName] attempt ${retryCount + 1} timeout: $paymentId", e) - } + val cause = throwable.cause - is InterruptedIOException -> { - logger.warn("[$accountName] interrupted: $paymentId", e) - } + val maxRetries = 3 + val nextRetry = retryCount + 1 - else -> { - logger.warn("[$accountName] io error: $paymentId", e) + if (nextRetry >= maxRetries) { + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "max retries reached") } + return@launch } - if (retryCount + 1 >= 3) { + val remaining = deadline - now() + val need = requestAvg.toMillis() + + if (remaining < need) { paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, "Max attempts reached") - } - } else { - val backoff = ((2.0.pow(retryCount.toDouble()) * 25).toLong() + Random.nextLong(10)) - val capped = backoff.coerceAtMost(deadline - now() - 5) - if (capped <= 0) { - paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, "Deadline expired") - } - } else { - scheduleRetry( - retryCount, - request, - paymentId, - transactionId, - timeBeforeCall, - deadline, - capped - ) - return@launch + it.logProcessing(false, now(), transactionId, "deadline expired") } + return@launch } - } else { - logger.warn("Free space in semaphore: {}", parallelLimiter.availablePermits) - logger.info( - "success in callback for payment: {}, retry count: {}, in time: {}", - paymentId, - retryCount, - now() + + val delayMs = (25L * (1L shl retryCount.toInt())).coerceAtMost(80L) + + scheduleRetry( + retryCount = nextRetry, + request = request, + paymentId = paymentId, + transactionId = transactionId, + timeBeforeCall = timeBeforeCall, + deadline = deadline, + delay = delayMs ) - val rawBody = response.body() - val parsed = try { - mapper.readValue(rawBody, ExternalSysResponse::class.java) - } catch (ex: Exception) { - ExternalSysResponse(transactionId.toString(), paymentId.toString(), false, ex.message) - } - paymentESService.update(paymentId) { - it.logProcessing(parsed.result, now(), transactionId, parsed.message) - } - timer.record(now() - timeBeforeCall, TimeUnit.MILLISECONDS) + return@launch } - } catch (e: Exception) { - logger.error("[$accountName] Error processing payment $paymentId", e) + + // успех + val body = response.body() + val parsed = try { + mapper.readValue(body, ExternalSysResponse::class.java) + } catch (ex: Exception) { + ExternalSysResponse(transactionId.toString(), paymentId.toString(), false, ex.message) + } + + paymentESService.update(paymentId) { + it.logProcessing(parsed.result, now(), transactionId, parsed.message) + } + + timer.record(now() - timeBeforeCall, TimeUnit.MILLISECONDS) + } finally { startedRequests.increment() parallelLimiter.release() @@ -258,19 +223,25 @@ class PaymentExternalSystemAdapterImpl( } private fun scheduleRetry( - retryCount: Long, request: HttpRequest, paymentId: UUID, - transactionId: UUID, timeBeforeCall: Long, deadline: Long, delay: Long + retryCount: Long, + request: HttpRequest, + paymentId: UUID, + transactionId: UUID, + timeBeforeCall: Long, + deadline: Long, + delay: Long ) { retryScheduler.schedule({ - val remainingTime = deadline - now() - if (remainingTime < requestAverageProcessingTime.toMillis()) { + + val remaining = deadline - now() + val need = requestAvg.toMillis() + + if (remaining < need) { scope.launch { try { paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, "Not enough time for retry") + it.logProcessing(false, now(), transactionId, "deadline expired retry") } - } catch (e: Exception) { - logger.error("[$accountName] Failed to record retry failure for $paymentId", e) } finally { startedRequests.increment() parallelLimiter.release() @@ -279,16 +250,25 @@ class PaymentExternalSystemAdapterImpl( return@schedule } - val newRequestTimeout = remainingTime.coerceIn(100, requestAverageProcessingTime.toMillis() * 2) - val newRequest = HttpRequest.newBuilder() + val timeout = min(remaining, requestAvg.toMillis() * 2).coerceAtLeast(100) + + val newReq = HttpRequest.newBuilder() .uri(request.uri()) - .timeout(Duration.ofMillis(newRequestTimeout)) + .timeout(Duration.ofMillis(timeout)) .POST(HttpRequest.BodyPublishers.noBody()) .build() - completeAction(retryCount + 1, newRequest, paymentId, transactionId, timeBeforeCall, deadline) + completeAction( + retryCount = retryCount, + request = newReq, + paymentId = paymentId, + transactionId = transactionId, + timeBeforeCall = timeBeforeCall, + deadline = deadline + ) + }, delay, TimeUnit.MILLISECONDS) } } -fun now() = System.currentTimeMillis() \ No newline at end of file +fun now() = System.currentTimeMillis() From e139c497a8d3f436d0e37979d848603fd900f475 Mon Sep 17 00:00:00 2001 From: 21092004Goda Date: Mon, 8 Dec 2025 01:50:09 +0300 Subject: [PATCH 24/24] Merge remote-tracking branch 'origin/hw-9-io-kuro' into hw-9-io-kuro # Conflicts: # src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt --- .../logic/PaymentExternalServiceImpl.kt | 256 ++++++++++-------- 1 file changed, 138 insertions(+), 118 deletions(-) diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt index 79694a7cc..598992345 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt @@ -3,7 +3,12 @@ package ru.quipy.payments.logic import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.registerKotlinModule import io.micrometer.core.instrument.MeterRegistry -import kotlinx.coroutines.* +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.CoroutineExceptionHandler +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.asCoroutineDispatcher +import kotlinx.coroutines.launch import kotlinx.coroutines.sync.Semaphore import org.slf4j.LoggerFactory import ru.quipy.common.utils.NamedThreadFactory @@ -21,10 +26,11 @@ import java.time.Duration import java.util.* import java.util.concurrent.Executors import java.util.concurrent.TimeUnit -import kotlin.math.min +import kotlin.math.pow import kotlin.random.Random +// Advice: always treat time as a Duration class PaymentExternalSystemAdapterImpl( private val properties: PaymentAccountProperties, private val paymentESService: EventSourcingService, @@ -33,44 +39,43 @@ class PaymentExternalSystemAdapterImpl( meterRegistry: MeterRegistry, private val parallelLimiter: Semaphore, private val ioDispatcher: CoroutineDispatcher = Executors.newFixedThreadPool( - Runtime.getRuntime().availableProcessors() * 4, + Runtime.getRuntime().availableProcessors() * 2, NamedThreadFactory("payment-io-") ).asCoroutineDispatcher() ) : PaymentExternalSystemAdapter { - companion object { val logger = LoggerFactory.getLogger(PaymentExternalSystemAdapter::class.java) val mapper = ObjectMapper().registerKotlinModule() } private val exceptionHandler = CoroutineExceptionHandler { _, throwable -> - logger.error("[${properties.accountName}] Unhandled exception", throwable) + logger.error("[$accountName] Unhandled exception in payment adapter coroutine", throwable) } private val scope = CoroutineScope(SupervisorJob() + ioDispatcher + exceptionHandler) + + // 2025-11-20T20:30:35.780+03:00 INFO 56644 --- [alhost:1234/...] ru.quipy.core.EventSourcingService : Optimistic lock exception. Failed to save event records id: [7dca693e-e811-4b7f-8bce-23e13d952c04-4] private val startedRequests = meterRegistry.counter("payment.processing.started", "accountName", properties.accountName) private val timer = meterRegistry.timer("payment.external.system.request.latency", "accountName", properties.accountName) - private val serviceName = properties.serviceName private val accountName = properties.accountName - private val requestAvg = properties.averageProcessingTime + private val requestAverageProcessingTime = properties.averageProcessingTime private val rateLimitPerSec = properties.rateLimitPerSec private val parallelRequests = properties.parallelRequests - private val rateLimit: SlidingWindowRateLimiter by lazy { SlidingWindowRateLimiter( - rate = rateLimitPerSec.toLong(), - window = Duration.ofSeconds(1) + rate = (rateLimitPerSec * 0.95).toLong(), + window = Duration.ofMillis(1000), ) } private val httpClient = HttpClient .newBuilder() - .executor(Executors.newFixedThreadPool(parallelRequests * 2)) + .executor(Executors.newFixedThreadPool(parallelRequests)) .version(HttpClient.Version.HTTP_2) .build() @@ -79,77 +84,98 @@ class PaymentExternalSystemAdapterImpl( ) override fun performPaymentAsync(paymentId: UUID, amount: Int, paymentStartedAt: Long, deadline: Long) { + logger.warn("[$accountName] Submitting payment request for payment $paymentId") val transactionId = UUID.randomUUID() + // Вне зависимости от исхода оплаты важно отметить что она была отправлена. + // Это требуется сделать ВО ВСЕХ СЛУЧАЯХ, поскольку эта информация используется сервисом тестирования. scope.launch { try { paymentESService.update(paymentId) { - it.logSubmission(true, transactionId, now(), Duration.ofMillis(now() - paymentStartedAt)) + it.logSubmission( + success = true, + transactionId, + now(), + Duration.ofMillis(now() - paymentStartedAt) + ) } - } catch (_: Exception) {} + logger.info("[$accountName] Log submission recorded for $paymentId") + } catch (e: Exception) { + logger.error("[$accountName] Failed to record log submission for $paymentId", e) + } } - val startRemaining = deadline - now() - val minRequired = requestAvg.toMillis() * 2 + logger.info("[$accountName] Submit: $paymentId , txId: $transactionId") - if (startRemaining < minRequired) { - val retryAfter = minRequired - startRemaining + Random.nextLong(20) - throw TooManyRequestsException(retryAfter) + val remaining = deadline - now() + val minRequiredTime = requestAverageProcessingTime.toMillis() * 2 + if (remaining < minRequiredTime) { + logger.warn("[$accountName] Not enough time for payment $paymentId: ${remaining}ms remaining, need ${minRequiredTime}ms") + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "not enough time") + } + val retryAfterMs = minRequiredTime - remaining + Random.nextLong(100) + throw TooManyRequestsException(retryAfterMs) } - if (!tryAcquire(deadline)) { - val retryAfter = (requestAvg.toMillis() / parallelRequests * 5) - .coerceIn(20, 120) + Random.nextLong(20) - throw TooManyRequestsException(retryAfter) + val timeBeforeCall = now() + + if (!tryAcquire(now(), deadline - now())) { + val retryAfterMs = (requestAverageProcessingTime.toMillis() / parallelRequests * 10).coerceIn( + 10, + 100 + ) + Random.nextLong(10) + + throw TooManyRequestsException(retryAfterMs) } if (!rateLimit.tickBlockingWithTimeout(deadline - now())) { parallelLimiter.release() - val retryAfter = (1000L / rateLimitPerSec * 5).coerceIn(20, 100) + Random.nextLong(20) - throw TooManyRequestsException(retryAfter) + + val retryAfterMs = (1000L / rateLimitPerSec * 10).coerceIn(10, 100) + Random.nextLong(10) + throw TooManyRequestsException(retryAfterMs) } - val reqTimeout = min( + val requestTimeout = minOf( deadline - now(), - requestAvg.toMillis() * 2 - ).coerceAtLeast(150) + requestAverageProcessingTime.toMillis() * 2 + ).coerceAtLeast(100) + + if (requestTimeout < requestAverageProcessingTime.toMillis()) { + logger.warn("[$accountName] Timeout too short for payment $paymentId: ${requestTimeout}ms") + parallelLimiter.release() + val retryAfterMs = requestAverageProcessingTime.toMillis() - requestTimeout + Random.nextLong(100) + throw TooManyRequestsException(retryAfterMs) + } val request = HttpRequest.newBuilder() - .uri( - URI( - "http://$paymentProviderHostPort/external/process" + - "?serviceName=$serviceName&token=$token" + - "&accountName=$accountName&transactionId=$transactionId" + - "&paymentId=$paymentId&amount=$amount" - ) - ) - .timeout(Duration.ofMillis(reqTimeout)) + .uri(URI("http://$paymentProviderHostPort/external/process?serviceName=$serviceName&token=$token&accountName=$accountName&transactionId=$transactionId&paymentId=$paymentId&amount=$amount")) + .timeout(Duration.ofMillis(requestTimeout)) .POST(HttpRequest.BodyPublishers.noBody()) .build() - completeAction( - retryCount = 0, - request = request, - paymentId = paymentId, - transactionId = transactionId, - timeBeforeCall = now(), - deadline = deadline - ) + val retryCount = 0L + + completeAction(retryCount, request, paymentId, transactionId, timeBeforeCall, deadline) } override fun price() = properties.price + override fun isEnabled() = properties.enabled - override fun name() = accountName - private fun tryAcquire(deadline: Long): Boolean { - while (now() < deadline) { - if (parallelLimiter.tryAcquire()) return true + override fun name() = properties.accountName + + fun tryAcquire(startedAt: Long, remaining: Long): Boolean { + var isAcquired = parallelLimiter.tryAcquire() + while (!isAcquired && now() - startedAt < remaining) { + isAcquired = parallelLimiter.tryAcquire() Thread.sleep(1) } - return false + + return isAcquired } - private fun completeAction( + fun completeAction( retryCount: Long, request: HttpRequest, paymentId: UUID, @@ -159,61 +185,70 @@ class PaymentExternalSystemAdapterImpl( ) { httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()) .whenComplete { response, throwable -> - scope.launch { try { if (throwable != null) { - val cause = throwable.cause + val e = throwable.cause + when (throwable.cause) { + is SocketTimeoutException -> { + logger.warn("[$accountName] attempt ${retryCount + 1} timeout: $paymentId", e) + } - val maxRetries = 3 - val nextRetry = retryCount + 1 + is InterruptedIOException -> { + logger.warn("[$accountName] interrupted: $paymentId", e) + } - if (nextRetry >= maxRetries) { - paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, "max retries reached") + else -> { + logger.warn("[$accountName] io error: $paymentId", e) } - return@launch } - val remaining = deadline - now() - val need = requestAvg.toMillis() - - if (remaining < need) { + if (retryCount + 1 >= 3) { paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, "deadline expired") + it.logProcessing(false, now(), transactionId, "Max attempts reached") + } + } else { + val backoff = ((2.0.pow(retryCount.toDouble()) * 25).toLong() + Random.nextLong(10)) + val capped = backoff.coerceAtMost(deadline - now() - 5) + if (capped <= 0) { + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "Deadline expired") + } + } else { + scheduleRetry( + retryCount, + request, + paymentId, + transactionId, + timeBeforeCall, + deadline, + capped + ) + return@launch } - return@launch } - - val delayMs = (25L * (1L shl retryCount.toInt())).coerceAtMost(80L) - - scheduleRetry( - retryCount = nextRetry, - request = request, - paymentId = paymentId, - transactionId = transactionId, - timeBeforeCall = timeBeforeCall, - deadline = deadline, - delay = delayMs + } else { + logger.warn("Free space in semaphore: {}", parallelLimiter.availablePermits) + logger.info( + "success in callback for payment: {}, retry count: {}, in time: {}", + paymentId, + retryCount, + now() ) + val rawBody = response.body() + val parsed = try { + mapper.readValue(rawBody, ExternalSysResponse::class.java) + } catch (ex: Exception) { + ExternalSysResponse(transactionId.toString(), paymentId.toString(), false, ex.message) + } - return@launch - } - - // успех - val body = response.body() - val parsed = try { - mapper.readValue(body, ExternalSysResponse::class.java) - } catch (ex: Exception) { - ExternalSysResponse(transactionId.toString(), paymentId.toString(), false, ex.message) - } - - paymentESService.update(paymentId) { - it.logProcessing(parsed.result, now(), transactionId, parsed.message) + paymentESService.update(paymentId) { + it.logProcessing(parsed.result, now(), transactionId, parsed.message) + } + timer.record(now() - timeBeforeCall, TimeUnit.MILLISECONDS) } - - timer.record(now() - timeBeforeCall, TimeUnit.MILLISECONDS) - + } catch (e: Exception) { + logger.error("[$accountName] Error processing payment $paymentId", e) } finally { startedRequests.increment() parallelLimiter.release() @@ -223,25 +258,19 @@ class PaymentExternalSystemAdapterImpl( } private fun scheduleRetry( - retryCount: Long, - request: HttpRequest, - paymentId: UUID, - transactionId: UUID, - timeBeforeCall: Long, - deadline: Long, - delay: Long + retryCount: Long, request: HttpRequest, paymentId: UUID, + transactionId: UUID, timeBeforeCall: Long, deadline: Long, delay: Long ) { retryScheduler.schedule({ - - val remaining = deadline - now() - val need = requestAvg.toMillis() - - if (remaining < need) { + val remainingTime = deadline - now() + if (remainingTime < requestAverageProcessingTime.toMillis()) { scope.launch { try { paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, "deadline expired retry") + it.logProcessing(false, now(), transactionId, "Not enough time for retry") } + } catch (e: Exception) { + logger.error("[$accountName] Failed to record retry failure for $paymentId", e) } finally { startedRequests.increment() parallelLimiter.release() @@ -250,25 +279,16 @@ class PaymentExternalSystemAdapterImpl( return@schedule } - val timeout = min(remaining, requestAvg.toMillis() * 2).coerceAtLeast(100) - - val newReq = HttpRequest.newBuilder() + val newRequestTimeout = remainingTime.coerceIn(100, requestAverageProcessingTime.toMillis() * 2) + val newRequest = HttpRequest.newBuilder() .uri(request.uri()) - .timeout(Duration.ofMillis(timeout)) + .timeout(Duration.ofMillis(newRequestTimeout)) .POST(HttpRequest.BodyPublishers.noBody()) .build() - completeAction( - retryCount = retryCount, - request = newReq, - paymentId = paymentId, - transactionId = transactionId, - timeBeforeCall = timeBeforeCall, - deadline = deadline - ) - + completeAction(retryCount + 1, newRequest, paymentId, transactionId, timeBeforeCall, deadline) }, delay, TimeUnit.MILLISECONDS) } } -fun now() = System.currentTimeMillis() +fun now() = System.currentTimeMillis() \ No newline at end of file