From 148a40e0d426b027db648b46cc66afbb9dce0d26 Mon Sep 17 00:00:00 2001 From: vaycheslav Date: Thu, 4 Dec 2025 15:37:52 +0300 Subject: [PATCH 1/9] feature: added corutines and callback logic --- .../kotlin/ru/quipy/OnlineShopApplication.kt | 5 +- .../ru/quipy/apigateway/APIController.kt | 4 +- .../orders/subscribers/PaymentSubscriber.kt | 2 +- .../ru/quipy/payments/logic/OrderPayer.kt | 22 ++-- .../quipy/payments/logic/PaymentCallback.kt | 90 ++++++++++++++ .../logic/PaymentExternalServiceImpl.kt | 112 +++++------------- .../ru/quipy/payments/logic/PaymentService.kt | 2 +- .../payments/logic/PaymentServiceImpl.kt | 10 +- src/main/resources/application.properties | 2 +- 9 files changed, 134 insertions(+), 115 deletions(-) create mode 100644 src/main/kotlin/ru/quipy/payments/logic/PaymentCallback.kt diff --git a/src/main/kotlin/ru/quipy/OnlineShopApplication.kt b/src/main/kotlin/ru/quipy/OnlineShopApplication.kt index 9ac22e094..1fcd3f89c 100644 --- a/src/main/kotlin/ru/quipy/OnlineShopApplication.kt +++ b/src/main/kotlin/ru/quipy/OnlineShopApplication.kt @@ -1,5 +1,6 @@ package ru.quipy +import kotlinx.coroutines.asCoroutineDispatcher import org.slf4j.Logger import org.slf4j.LoggerFactory import org.springframework.boot.autoconfigure.SpringBootApplication @@ -13,10 +14,10 @@ class OnlineShopApplication { val log: Logger = LoggerFactory.getLogger(OnlineShopApplication::class.java) companion object { - val appExecutor = Executors.newFixedThreadPool(64, NamedThreadFactory("main-app-executor")) + val appExecutor = Executors.newFixedThreadPool(20_000, NamedThreadFactory("main-app-executor")).asCoroutineDispatcher() } } -fun main(args: Array) { +suspend fun main(args: Array) { runApplication(*args) } diff --git a/src/main/kotlin/ru/quipy/apigateway/APIController.kt b/src/main/kotlin/ru/quipy/apigateway/APIController.kt index c07d96b7a..3720312da 100644 --- a/src/main/kotlin/ru/quipy/apigateway/APIController.kt +++ b/src/main/kotlin/ru/quipy/apigateway/APIController.kt @@ -17,7 +17,7 @@ class APIController( private val orderRepository: OrderRepository, private val orderPayer: OrderPayer, @field:Qualifier("parallelLimiter") - private val rateLimiter: RateLimiter = LeakingBucketRateLimiter(8, Duration.ofSeconds(1), 30) + private val rateLimiter: RateLimiter = LeakingBucketRateLimiter(1100, Duration.ofSeconds(1), 4400) ) { val logger: Logger = LoggerFactory.getLogger(APIController::class.java) @@ -58,7 +58,7 @@ class APIController( } @PostMapping("/orders/{orderId}/payment") - fun payOrder(@PathVariable orderId: UUID, @RequestParam deadline: Long): PaymentSubmissionDto { + suspend fun payOrder(@PathVariable orderId: UUID, @RequestParam deadline: Long): PaymentSubmissionDto { if (!rateLimiter.tick()) { throw TooManyRequestsException(deadline) } diff --git a/src/main/kotlin/ru/quipy/orders/subscribers/PaymentSubscriber.kt b/src/main/kotlin/ru/quipy/orders/subscribers/PaymentSubscriber.kt index a4c312eaf..0b7560804 100644 --- a/src/main/kotlin/ru/quipy/orders/subscribers/PaymentSubscriber.kt +++ b/src/main/kotlin/ru/quipy/orders/subscribers/PaymentSubscriber.kt @@ -36,7 +36,7 @@ class PaymentSubscriber { retryConf = RetryConf(1, RetryFailedStrategy.SKIP_EVENT) ) { `when`(PaymentProcessedEvent::class) { event -> - appExecutor.submit { + appExecutor.run { logger.trace( "Payment results. OrderId ${event.orderId}, succeeded: ${event.success}, txId: ${event.transactionId}, reason: ${event.reason}, duration: ${ Duration.ofMillis( diff --git a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt index 359aa0a59..4835a1d26 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt @@ -36,33 +36,25 @@ class OrderPayer { @Autowired private lateinit var paymentService: PaymentService - private val paymentExecutor = ThreadPoolExecutor( - 16, - 16, - 0, - TimeUnit.MILLISECONDS, - LinkedBlockingQueue(8_000), - NamedThreadFactory("payment-submission-executor"), - CallerBlockingRejectedExecutionHandler() - ) - fun processPayment(orderId: UUID, amount: Int, paymentId: UUID, deadline: Long): Long { + suspend fun processPayment(orderId: UUID, amount: Int, paymentId: UUID, deadline: Long): Long { val createdAt = System.currentTimeMillis() if (!rateLimit.tickBlocking(deadline- System.currentTimeMillis())) { throw TooManyRequestsException(deadline) } - paymentExecutor.submit { - val createdEvent = paymentESService.create { + + val createdEvent = paymentESService.create { it.create( paymentId, orderId, amount ) } - logger.trace("Payment ${createdEvent.paymentId} for order $orderId created.") - paymentService.submitPaymentRequest(paymentId, amount, createdAt, deadline) - } + logger.trace("Payment ${createdEvent.paymentId} for order $orderId created.") + + paymentService.submitPaymentRequest(paymentId, amount, createdAt, deadline) + return createdAt } } \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentCallback.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentCallback.kt new file mode 100644 index 000000000..3b37418c6 --- /dev/null +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentCallback.kt @@ -0,0 +1,90 @@ +package ru.quipy.payments.logic + +import io.micrometer.core.instrument.Timer +import kotlinx.coroutines.sync.Semaphore +import okhttp3.Call +import okhttp3.Callback +import okhttp3.OkHttpClient +import okhttp3.Request +import okhttp3.Response +import ru.quipy.core.EventSourcingService +import ru.quipy.payments.api.PaymentAggregate +import ru.quipy.payments.logic.PaymentExternalSystemAdapterImpl.Companion.logger +import ru.quipy.payments.logic.PaymentExternalSystemAdapterImpl.Companion.mapper +import java.io.InterruptedIOException +import java.net.SocketTimeoutException +import java.util.UUID +import java.util.concurrent.TimeUnit +import kotlin.math.pow + + +class PaymentCallback(val semaphore: Semaphore, val accountName: String, val retryCount: Int, val paymentId: UUID, val transactionId: UUID, val paymentESService: EventSourcingService, val client: OkHttpClient, val request: Request, val timer: Timer, val deadline: Long, val timeBeforeCall: Long) : Callback { + override fun onFailure(call: Call, e: java.io.IOException) { + + logger.debug("fail in callback for payment: {}, retry count: {}, deadline: {}, in time: {}", paymentId, retryCount, deadline, now(), e) + when (e) { + is SocketTimeoutException -> { + logger.warn("[$accountName] attempt ${retryCount + 1} timeout: $paymentId", e) + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "socket timeout") + } + } + + is InterruptedIOException -> { + logger.warn("[$accountName] interrupted: $paymentId", e) + + // Здесь мы обновляем состояние оплаты в зависимости от результата в базе данных оплат. + // Это требуется сделать ВО ВСЕХ ИСХОДАХ (успешная оплата / неуспешная / ошибочная ситуация) + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "interrupted IO") + } + } + + else -> { + logger.warn("[$accountName] io error: $paymentId", e) + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "io exception") + } + } + } + + if (retryCount + 1 >= 3) { + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "Max attempts reached") + } + return + } + + val backoff = ((2.0.pow(retryCount.toDouble()) * 25).toLong() + kotlin.random.Random.nextLong(10)) + val capped = backoff.coerceAtMost(deadline - now() - 5) + if (capped <= 0) { + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "Deadline expired") + } + return + } + val nCall = client.newCall(request) + nCall.enqueue(PaymentCallback(semaphore, accountName, retryCount + 1, paymentId, transactionId, paymentESService, client, request, timer, deadline, now())) + timer.record(now() - timeBeforeCall, TimeUnit.MILLISECONDS) + semaphore.release() + + } + + override fun onResponse(call: Call, response: Response) { + + logger.debug("success in callback for payment: {}, retry count: {}, deadline: {}, in time: {}", paymentId, retryCount, deadline, now()) + + val rawBody = response.body?.string() + val parsed = try { + mapper.readValue(rawBody, ExternalSysResponse::class.java) + } catch (ex: Exception) { + ExternalSysResponse(transactionId.toString(), paymentId.toString(), false, ex.message) + } + + paymentESService.update(paymentId) { + it.logProcessing(parsed.result, now(), transactionId, parsed.message) + } + } + + fun now() = System.currentTimeMillis() +} \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt index f0e84098c..a1489a699 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt @@ -3,11 +3,17 @@ package ru.quipy.payments.logic import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.registerKotlinModule import io.micrometer.core.instrument.MeterRegistry +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.callbackFlow +import okhttp3.Call +import okhttp3.Callback import okhttp3.OkHttpClient import okhttp3.Request import okhttp3.RequestBody +import okhttp3.Response import okio.IOException import org.slf4j.LoggerFactory +import org.springframework.boot.autoconfigure.integration.IntegrationProperties import ru.quipy.core.EventSourcingService import ru.quipy.payments.api.PaymentAggregate import java.io.InterruptedIOException @@ -35,7 +41,7 @@ class PaymentExternalSystemAdapterImpl( val emptyBody = RequestBody.create(null, ByteArray(0)) val mapper = ObjectMapper().registerKotlinModule() } - + // 2025-11-20T20:30:35.780+03:00 INFO 56644 --- [alhost:1234/...] ru.quipy.core.EventSourcingService : Optimistic lock exception. Failed to save event records id: [7dca693e-e811-4b7f-8bce-23e13d952c04-4] private val timer = meterRegistry.timer("payment.external.system.request.latency", "accountName", properties.accountName) private val serviceName = properties.serviceName private val accountName = properties.accountName @@ -78,91 +84,29 @@ class PaymentExternalSystemAdapterImpl( val timeBeforeCall = now() var shouldRetry = false - try { - val perCallTimeoutMs = remaining.coerceAtMost(1100) - val request = Request.Builder() - .url( - "http://$paymentProviderHostPort/external/process" + + val perCallTimeoutMs = remaining.coerceAtMost(1100) + val request = Request.Builder() + .url( + "http://$paymentProviderHostPort/external/process" + "?serviceName=$serviceName&token=$token&accountName=$accountName" + "&transactionId=$transactionId&paymentId=$paymentId&amount=$amount" - ) - .post(emptyBody) - .build() - - val call = client.newCall(request) - call.timeout().timeout(perCallTimeoutMs, TimeUnit.MILLISECONDS) - - call.execute().use { response -> - val rawBody = response.body?.string() - val parsed = try { - mapper.readValue(rawBody, ExternalSysResponse::class.java) - } catch (ex: Exception) { - ExternalSysResponse(transactionId.toString(), paymentId.toString(), false, ex.message) - } - - shouldRetry = !parsed.result && (response.code == 429 || response.code >= 500) - - paymentESService.update(paymentId) { - it.logProcessing(parsed.result, now(), transactionId, parsed.message) - } - - if (parsed.result) { - return - } - } - } catch (e: SocketTimeoutException) { - logger.warn("[$accountName] attempt ${retryCount + 1} timeout: $paymentId", e) - shouldRetry = true - paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, "socket timeout") - } - } catch (e: InterruptedIOException) { - logger.warn("[$accountName] interrupted: $paymentId", e) - shouldRetry = true - - // Здесь мы обновляем состояние оплаты в зависимости от результата в базе данных оплат. - // Это требуется сделать ВО ВСЕХ ИСХОДАХ (успешная оплата / неуспешная / ошибочная ситуация) - paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, "interrupted IO") - } - } catch (e: IOException) { - logger.warn("[$accountName] io error: $paymentId", e) - shouldRetry = true - paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, "io exception") - } - } catch (e: Exception) { - logger.error("[$accountName] non-retriable error: $paymentId", e) - shouldRetry = false - paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, e.message) - } - } finally { - timer.record(now() - timeBeforeCall, TimeUnit.MILLISECONDS) - parallelLimiter.release() - } - - if (!shouldRetry) { - return - } - - retryCount++ - if (retryCount >= 3) { - paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, "Max attempts reached") - } - return - } - - val backoff = ((2.0.pow(retryCount.toDouble()) * 25).toLong() + kotlin.random.Random.nextLong(10)) - val capped = backoff.coerceAtMost(deadline - now() - 5) - if (capped <= 0) { - paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, "Deadline expired") - } - return - } - Thread.sleep(capped) + ) + .post(emptyBody) + .build() + + val call = client.newCall(request).enqueue(PaymentCallback( + kotlinx.coroutines.sync.Semaphore(parallelRequests), + accountName, + retryCount, + paymentId, + transactionId, + paymentESService, + client, + request, + timer, + deadline, + timeBeforeCall + )) } } diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentService.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentService.kt index 255db77dd..848f40474 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentService.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentService.kt @@ -7,7 +7,7 @@ interface PaymentService { /** * Submit payment request to some external service. */ - fun submitPaymentRequest(paymentId: UUID, amount: Int, paymentStartedAt: Long, deadline: Long) + suspend fun submitPaymentRequest(paymentId: UUID, amount: Int, paymentStartedAt: Long, deadline: Long) } /** diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentServiceImpl.kt index 1c24e5a72..3fb881462 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentServiceImpl.kt @@ -1,16 +1,8 @@ package ru.quipy.payments.logic import org.slf4j.LoggerFactory -import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service -import ru.quipy.common.utils.NamedThreadFactory -import ru.quipy.core.EventSourcingService -import ru.quipy.payments.api.PaymentAggregate -import java.time.Duration import java.util.* -import java.util.concurrent.Executors -import java.util.concurrent.locks.ReentrantLock -import kotlin.concurrent.withLock @Service @@ -21,7 +13,7 @@ class PaymentSystemImpl( val logger = LoggerFactory.getLogger(PaymentSystemImpl::class.java) } - override fun submitPaymentRequest(paymentId: UUID, amount: Int, paymentStartedAt: Long, deadline: Long) { + override suspend fun submitPaymentRequest(paymentId: UUID, amount: Int, paymentStartedAt: Long, deadline: Long) { for (account in paymentAccounts) { account.performPaymentAsync(paymentId, amount, paymentStartedAt, deadline) } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 3170b6dad..c06d8d823 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -26,6 +26,6 @@ payment.service-name=${PAYMENT_SERVICE_NAME} payment.token=${PAYMENT_TOKEN} # payment.accounts=${PAYMENT_ACCOUNTS:acc-12,acc-20} # payment.accounts=${PAYMENT_ACCOUNTS:acc-3} -payment.accounts=${PAYMENT_ACCOUNTS:acc-7} +payment.accounts=${PAYMENT_ACCOUNTS:acc-9} # payment.accounts=${PAYMENT_ACCOUNTS:acc-18} payment.hostPort=${PAYMENT_HOST:localhost}:${PAYMENT_PORT:1234} From 3f76471a2041eed34cd9d9f0852d9ec99aa099f0 Mon Sep 17 00:00:00 2001 From: vaycheslav Date: Thu, 4 Dec 2025 19:37:06 +0300 Subject: [PATCH 2/9] fix: fixed semaphore acquiring and releasing logic --- .../payments/config/PaymentAccountsConfig.kt | 2 +- .../quipy/payments/logic/PaymentCallback.kt | 39 +++++++++------- .../logic/PaymentExternalServiceImpl.kt | 45 +++++++------------ src/main/resources/application.properties | 4 +- 4 files changed, 44 insertions(+), 46 deletions(-) diff --git a/src/main/kotlin/ru/quipy/payments/config/PaymentAccountsConfig.kt b/src/main/kotlin/ru/quipy/payments/config/PaymentAccountsConfig.kt index e10bd805a..8012d1ee7 100644 --- a/src/main/kotlin/ru/quipy/payments/config/PaymentAccountsConfig.kt +++ b/src/main/kotlin/ru/quipy/payments/config/PaymentAccountsConfig.kt @@ -4,6 +4,7 @@ import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule import com.fasterxml.jackson.module.kotlin.registerKotlinModule import io.micrometer.core.instrument.MeterRegistry +import kotlinx.coroutines.sync.Semaphore import org.springframework.beans.factory.annotation.Value import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration @@ -18,7 +19,6 @@ import java.net.http.HttpClient import java.net.http.HttpRequest import java.net.http.HttpResponse import java.util.* -import java.util.concurrent.Semaphore @Configuration diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentCallback.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentCallback.kt index 3b37418c6..f1962a973 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentCallback.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentCallback.kt @@ -19,6 +19,7 @@ import kotlin.math.pow class PaymentCallback(val semaphore: Semaphore, val accountName: String, val retryCount: Int, val paymentId: UUID, val transactionId: UUID, val paymentESService: EventSourcingService, val client: OkHttpClient, val request: Request, val timer: Timer, val deadline: Long, val timeBeforeCall: Long) : Callback { + private val remaining: Long = 10_000L //ms override fun onFailure(call: Call, e: java.io.IOException) { logger.debug("fail in callback for payment: {}, retry count: {}, deadline: {}, in time: {}", paymentId, retryCount, deadline, now(), e) @@ -52,6 +53,7 @@ class PaymentCallback(val semaphore: Semaphore, val accountName: String, val ret paymentESService.update(paymentId) { it.logProcessing(false, now(), transactionId, "Max attempts reached") } + semaphore.release() return } @@ -61,30 +63,37 @@ class PaymentCallback(val semaphore: Semaphore, val accountName: String, val ret paymentESService.update(paymentId) { it.logProcessing(false, now(), transactionId, "Deadline expired") } + semaphore.release() return } val nCall = client.newCall(request) nCall.enqueue(PaymentCallback(semaphore, accountName, retryCount + 1, paymentId, transactionId, paymentESService, client, request, timer, deadline, now())) timer.record(now() - timeBeforeCall, TimeUnit.MILLISECONDS) - semaphore.release() - } override fun onResponse(call: Call, response: Response) { + try { + logger.warn("Free space in semaphore: {}", semaphore.availablePermits) + logger.info( + "success in callback for payment: {}, retry count: {}, deadline: {}, in time: {}", + paymentId, + retryCount, + deadline, + now() + ) + val rawBody = response.body?.string() + val parsed = try { + mapper.readValue(rawBody, ExternalSysResponse::class.java) + } catch (ex: Exception) { + ExternalSysResponse(transactionId.toString(), paymentId.toString(), false, ex.message) + } - logger.debug("success in callback for payment: {}, retry count: {}, deadline: {}, in time: {}", paymentId, retryCount, deadline, now()) - - val rawBody = response.body?.string() - val parsed = try { - mapper.readValue(rawBody, ExternalSysResponse::class.java) - } catch (ex: Exception) { - ExternalSysResponse(transactionId.toString(), paymentId.toString(), false, ex.message) - } - - paymentESService.update(paymentId) { - it.logProcessing(parsed.result, now(), transactionId, parsed.message) + paymentESService.update(paymentId) { + it.logProcessing(parsed.result, now(), transactionId, parsed.message) + } + } finally { + semaphore.release() + timer.record(now() - timeBeforeCall, TimeUnit.MILLISECONDS) } } - - fun now() = System.currentTimeMillis() } \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt index a1489a699..a0cc37859 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt @@ -3,26 +3,16 @@ package ru.quipy.payments.logic import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.registerKotlinModule import io.micrometer.core.instrument.MeterRegistry -import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.callbackFlow -import okhttp3.Call -import okhttp3.Callback +import kotlinx.coroutines.sync.Semaphore import okhttp3.OkHttpClient import okhttp3.Request import okhttp3.RequestBody -import okhttp3.Response -import okio.IOException import org.slf4j.LoggerFactory -import org.springframework.boot.autoconfigure.integration.IntegrationProperties import ru.quipy.core.EventSourcingService import ru.quipy.payments.api.PaymentAggregate -import java.io.InterruptedIOException -import java.net.SocketTimeoutException import java.time.Duration import java.util.* -import java.util.concurrent.Semaphore import java.util.concurrent.TimeUnit -import kotlin.math.pow // Advice: always treat time as a Duration @@ -50,11 +40,14 @@ class PaymentExternalSystemAdapterImpl( private val parallelRequests = properties.parallelRequests private val client = OkHttpClient.Builder() - .callTimeout(1100, TimeUnit.MILLISECONDS).build() + .callTimeout(30_000, TimeUnit.MILLISECONDS).build() override fun performPaymentAsync(paymentId: UUID, amount: Int, paymentStartedAt: Long, deadline: Long) { logger.warn("[$accountName] Submitting payment request for payment $paymentId") + + + fun now() = System.currentTimeMillis() val transactionId = UUID.randomUUID() // Вне зависимости от исхода оплаты важно отметить что она была отправлена. @@ -66,7 +59,7 @@ class PaymentExternalSystemAdapterImpl( logger.info("[$accountName] Submit: $paymentId , txId: $transactionId") var retryCount = 0 - while (true) { + val remaining = deadline - now() if (remaining <= 0) { paymentESService.update(paymentId) { @@ -75,16 +68,9 @@ class PaymentExternalSystemAdapterImpl( return } - if (!parallelLimiter.tryAcquire(remaining, TimeUnit.MILLISECONDS)) { - paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, "parallel limiter timeout") - } - return - } - val timeBeforeCall = now() - var shouldRetry = false - val perCallTimeoutMs = remaining.coerceAtMost(1100) + remaining.coerceAtMost(30_000L) + tryAcquire(now(), remaining) val request = Request.Builder() .url( "http://$paymentProviderHostPort/external/process" + @@ -94,8 +80,8 @@ class PaymentExternalSystemAdapterImpl( .post(emptyBody) .build() - val call = client.newCall(request).enqueue(PaymentCallback( - kotlinx.coroutines.sync.Semaphore(parallelRequests), + client.newCall(request).enqueue(PaymentCallback( + parallelLimiter, accountName, retryCount, paymentId, @@ -108,15 +94,18 @@ class PaymentExternalSystemAdapterImpl( timeBeforeCall )) } - } override fun price() = properties.price override fun isEnabled() = properties.enabled override fun name() = properties.accountName - fun timeToDead(deadline: Long): Long { - return deadline - now() + + fun tryAcquire(startedAt: Long, remaining: Long): Boolean { + while (!parallelLimiter.tryAcquire() && now()-startedAt < remaining) { } + return parallelLimiter.tryAcquire() } + } -public fun now() = System.currentTimeMillis() \ No newline at end of file + +fun now() = System.currentTimeMillis() \ No newline at end of file diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index c06d8d823..4a60c6257 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -15,7 +15,7 @@ event.sourcing.sagas-enabled=false spring.datasource.hikari.jdbc-url=jdbc:postgresql://${POSTGRES_ADDRESS:localhost}:${POSTGRES_PORT:65432}/postgres spring.datasource.hikari.username=tiny_es spring.datasource.hikari.password=tiny_es - +logger.level.ru.quipy.payments.logic.PaymentCallback=DEBUG spring.datasource.hikari.leak-detection-threshold=2000 management.metrics.web.server.request.autotime.percentiles=0.95 management.metrics.export.prometheus.enabled=true @@ -26,6 +26,6 @@ payment.service-name=${PAYMENT_SERVICE_NAME} payment.token=${PAYMENT_TOKEN} # payment.accounts=${PAYMENT_ACCOUNTS:acc-12,acc-20} # payment.accounts=${PAYMENT_ACCOUNTS:acc-3} -payment.accounts=${PAYMENT_ACCOUNTS:acc-9} +payment.accounts=${PAYMENT_ACCOUNTS:acc-12} # payment.accounts=${PAYMENT_ACCOUNTS:acc-18} payment.hostPort=${PAYMENT_HOST:localhost}:${PAYMENT_PORT:1234} From 6aff9b2e856f3e3d5518736fa2d5a5a0c7ff2509 Mon Sep 17 00:00:00 2001 From: vaycheslav Date: Thu, 4 Dec 2025 20:39:13 +0300 Subject: [PATCH 3/9] fix: fixed rate limiter in order payer and httpCall dispatcher --- .../ru/quipy/payments/logic/OrderPayer.kt | 12 ++++--- .../quipy/payments/logic/PaymentCallback.kt | 7 ++-- .../logic/PaymentExternalServiceImpl.kt | 34 ++++++++++--------- .../ru/quipy/payments/logic/PaymentService.kt | 2 +- 4 files changed, 31 insertions(+), 24 deletions(-) diff --git a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt index 4835a1d26..a929ed6be 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt @@ -1,5 +1,6 @@ package ru.quipy.payments.logic +import io.micrometer.core.instrument.MeterRegistry import org.slf4j.Logger import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Autowired @@ -18,17 +19,19 @@ import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.TimeUnit @Service -class OrderPayer { +class OrderPayer(meterRegistry: MeterRegistry) { companion object { val logger: Logger = LoggerFactory.getLogger(OrderPayer::class.java) } private val rateLimit: SlidingWindowRateLimiter by lazy { SlidingWindowRateLimiter( - rate = 8, + rate = 1100, window = Duration.ofMillis(1000), ) } + private val plannedRequests = meterRegistry.counter("payment.processing.planned", "accountName", "acc-12") + @Autowired private lateinit var paymentESService: EventSourcingService @@ -39,11 +42,12 @@ class OrderPayer { suspend fun processPayment(orderId: UUID, amount: Int, paymentId: UUID, deadline: Long): Long { val createdAt = System.currentTimeMillis() + plannedRequests.increment() if (!rateLimit.tickBlocking(deadline- System.currentTimeMillis())) { throw TooManyRequestsException(deadline) } - val createdEvent = paymentESService.create { + val createdEvent = paymentESService.create { it.create( paymentId, orderId, @@ -51,7 +55,7 @@ class OrderPayer { ) } - logger.trace("Payment ${createdEvent.paymentId} for order $orderId created.") + logger.trace("Payment {} for order {} created.", createdEvent.paymentId, orderId) paymentService.submitPaymentRequest(paymentId, amount, createdAt, deadline) diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentCallback.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentCallback.kt index f1962a973..92bb53906 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentCallback.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentCallback.kt @@ -1,6 +1,7 @@ package ru.quipy.payments.logic import io.micrometer.core.instrument.Timer +import io.micrometer.core.instrument.Counter import kotlinx.coroutines.sync.Semaphore import okhttp3.Call import okhttp3.Callback @@ -18,8 +19,7 @@ import java.util.concurrent.TimeUnit import kotlin.math.pow -class PaymentCallback(val semaphore: Semaphore, val accountName: String, val retryCount: Int, val paymentId: UUID, val transactionId: UUID, val paymentESService: EventSourcingService, val client: OkHttpClient, val request: Request, val timer: Timer, val deadline: Long, val timeBeforeCall: Long) : Callback { - private val remaining: Long = 10_000L //ms +class PaymentCallback(val startedRequestsCounter: Counter, val semaphore: Semaphore, val accountName: String, val retryCount: Int, val paymentId: UUID, val transactionId: UUID, val paymentESService: EventSourcingService, val client: OkHttpClient, val request: Request, val timer: Timer, val deadline: Long, val timeBeforeCall: Long) : Callback { override fun onFailure(call: Call, e: java.io.IOException) { logger.debug("fail in callback for payment: {}, retry count: {}, deadline: {}, in time: {}", paymentId, retryCount, deadline, now(), e) @@ -67,11 +67,12 @@ class PaymentCallback(val semaphore: Semaphore, val accountName: String, val ret return } val nCall = client.newCall(request) - nCall.enqueue(PaymentCallback(semaphore, accountName, retryCount + 1, paymentId, transactionId, paymentESService, client, request, timer, deadline, now())) + nCall.enqueue(PaymentCallback(startedRequestsCounter, semaphore, accountName, retryCount + 1, paymentId, transactionId, paymentESService, client, request, timer, deadline, now())) timer.record(now() - timeBeforeCall, TimeUnit.MILLISECONDS) } override fun onResponse(call: Call, response: Response) { + startedRequestsCounter.increment() try { logger.warn("Free space in semaphore: {}", semaphore.availablePermits) logger.info( diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt index a0cc37859..2c593db00 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt @@ -4,9 +4,7 @@ import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.registerKotlinModule import io.micrometer.core.instrument.MeterRegistry import kotlinx.coroutines.sync.Semaphore -import okhttp3.OkHttpClient -import okhttp3.Request -import okhttp3.RequestBody +import okhttp3.* import org.slf4j.LoggerFactory import ru.quipy.core.EventSourcingService import ru.quipy.payments.api.PaymentAggregate @@ -24,7 +22,6 @@ class PaymentExternalSystemAdapterImpl( meterRegistry: MeterRegistry, private val parallelLimiter: Semaphore ) : PaymentExternalSystemAdapter { - companion object { val logger = LoggerFactory.getLogger(PaymentExternalSystemAdapter::class.java) @@ -32,6 +29,7 @@ class PaymentExternalSystemAdapterImpl( val mapper = ObjectMapper().registerKotlinModule() } // 2025-11-20T20:30:35.780+03:00 INFO 56644 --- [alhost:1234/...] ru.quipy.core.EventSourcingService : Optimistic lock exception. Failed to save event records id: [7dca693e-e811-4b7f-8bce-23e13d952c04-4] + private val startedRequests = meterRegistry.counter("payment.processing.started", "accountName", properties.accountName) private val timer = meterRegistry.timer("payment.external.system.request.latency", "accountName", properties.accountName) private val serviceName = properties.serviceName private val accountName = properties.accountName @@ -39,15 +37,20 @@ class PaymentExternalSystemAdapterImpl( private val rateLimitPerSec = properties.rateLimitPerSec private val parallelRequests = properties.parallelRequests - private val client = OkHttpClient.Builder() - .callTimeout(30_000, TimeUnit.MILLISECONDS).build() - - override fun performPaymentAsync(paymentId: UUID, amount: Int, paymentStartedAt: Long, deadline: Long) { - logger.warn("[$accountName] Submitting payment request for payment $paymentId") + private val dispatcher = Dispatcher().apply { + maxRequestsPerHost = parallelRequests + maxRequests = parallelRequests * 2 + } + private val client = OkHttpClient.Builder() + .dispatcher(dispatcher) + .connectionPool(ConnectionPool(parallelRequests, 6, TimeUnit.MINUTES)) + .callTimeout(30_000, TimeUnit.MILLISECONDS) + .build() - fun now() = System.currentTimeMillis() + override suspend fun performPaymentAsync(paymentId: UUID, amount: Int, paymentStartedAt: Long, deadline: Long) { + logger.warn("[$accountName] Submitting payment request for payment $paymentId") val transactionId = UUID.randomUUID() // Вне зависимости от исхода оплаты важно отметить что она была отправлена. @@ -57,9 +60,6 @@ class PaymentExternalSystemAdapterImpl( } logger.info("[$accountName] Submit: $paymentId , txId: $transactionId") - - var retryCount = 0 - val remaining = deadline - now() if (remaining <= 0) { paymentESService.update(paymentId) { @@ -80,10 +80,12 @@ class PaymentExternalSystemAdapterImpl( .post(emptyBody) .build() - client.newCall(request).enqueue(PaymentCallback( + logger.info("Client connections {}. Semaphore was locked: {} ", client.connectionPool.connectionCount(), parallelRequests-parallelLimiter.availablePermits) + client.newCall(request).enqueue(PaymentCallback( + startedRequests, parallelLimiter, accountName, - retryCount, + 0, paymentId, transactionId, paymentESService, @@ -106,6 +108,6 @@ class PaymentExternalSystemAdapterImpl( return parallelLimiter.tryAcquire() } -} +} fun now() = System.currentTimeMillis() \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentService.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentService.kt index 848f40474..d67aebbf0 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentService.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentService.kt @@ -17,7 +17,7 @@ interface PaymentService { */ interface PaymentExternalSystemAdapter { - fun performPaymentAsync(paymentId: UUID, amount: Int, paymentStartedAt: Long, deadline: Long) + suspend fun performPaymentAsync(paymentId: UUID, amount: Int, paymentStartedAt: Long, deadline: Long) fun name(): String From e7fc62c452a31971a3ee34afa8a8cc65efccab05 Mon Sep 17 00:00:00 2001 From: vaycheslav Date: Thu, 4 Dec 2025 21:07:22 +0300 Subject: [PATCH 4/9] fix: fixed rate limiter in order payer and httpCall dispatcher --- src/main/kotlin/ru/quipy/payments/logic/PaymentCallback.kt | 2 ++ src/main/resources/application.properties | 1 + 2 files changed, 3 insertions(+) diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentCallback.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentCallback.kt index 92bb53906..87e6f4103 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentCallback.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentCallback.kt @@ -53,6 +53,7 @@ class PaymentCallback(val startedRequestsCounter: Counter, val semaphore: Semaph paymentESService.update(paymentId) { it.logProcessing(false, now(), transactionId, "Max attempts reached") } + startedRequestsCounter.increment() semaphore.release() return } @@ -63,6 +64,7 @@ class PaymentCallback(val startedRequestsCounter: Counter, val semaphore: Semaph paymentESService.update(paymentId) { it.logProcessing(false, now(), transactionId, "Deadline expired") } + startedRequestsCounter.increment() semaphore.release() return } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 4a60c6257..bdbe25f61 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -29,3 +29,4 @@ payment.token=${PAYMENT_TOKEN} payment.accounts=${PAYMENT_ACCOUNTS:acc-12} # payment.accounts=${PAYMENT_ACCOUNTS:acc-18} payment.hostPort=${PAYMENT_HOST:localhost}:${PAYMENT_PORT:1234} + From d1d6e88b5566d2443087c169dcd61bacd461f468 Mon Sep 17 00:00:00 2001 From: vaycheslav Date: Thu, 4 Dec 2025 21:49:33 +0300 Subject: [PATCH 5/9] fix: fixed timeout --- .../ru/quipy/payments/logic/PaymentExternalServiceImpl.kt | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt index 2c593db00..f780addf3 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt @@ -28,6 +28,8 @@ class PaymentExternalSystemAdapterImpl( val emptyBody = RequestBody.create(null, ByteArray(0)) val mapper = ObjectMapper().registerKotlinModule() } + + private val remaining = 50_000L // 2025-11-20T20:30:35.780+03:00 INFO 56644 --- [alhost:1234/...] ru.quipy.core.EventSourcingService : Optimistic lock exception. Failed to save event records id: [7dca693e-e811-4b7f-8bce-23e13d952c04-4] private val startedRequests = meterRegistry.counter("payment.processing.started", "accountName", properties.accountName) private val timer = meterRegistry.timer("payment.external.system.request.latency", "accountName", properties.accountName) @@ -45,7 +47,7 @@ class PaymentExternalSystemAdapterImpl( private val client = OkHttpClient.Builder() .dispatcher(dispatcher) .connectionPool(ConnectionPool(parallelRequests, 6, TimeUnit.MINUTES)) - .callTimeout(30_000, TimeUnit.MILLISECONDS) + .callTimeout(remaining, TimeUnit.MILLISECONDS) .build() @@ -69,7 +71,7 @@ class PaymentExternalSystemAdapterImpl( } val timeBeforeCall = now() - remaining.coerceAtMost(30_000L) + remaining.coerceAtMost(remaining) tryAcquire(now(), remaining) val request = Request.Builder() .url( From f24804d964b4bccd5e9ca6910d51dcddb1c7a343 Mon Sep 17 00:00:00 2001 From: vaycheslav Date: Fri, 5 Dec 2025 16:39:51 +0300 Subject: [PATCH 6/9] fix: added http2 client --- .../logic/PaymentExternalServiceImpl.kt | 144 ++++++++++++------ 1 file changed, 99 insertions(+), 45 deletions(-) diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt index f780addf3..07b348927 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt @@ -4,12 +4,19 @@ import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.registerKotlinModule import io.micrometer.core.instrument.MeterRegistry import kotlinx.coroutines.sync.Semaphore -import okhttp3.* +import okhttp3.RequestBody import org.slf4j.LoggerFactory import ru.quipy.core.EventSourcingService import ru.quipy.payments.api.PaymentAggregate +import java.io.InterruptedIOException +import java.net.SocketTimeoutException +import java.net.URI +import java.net.http.HttpClient +import java.net.http.HttpRequest +import java.net.http.HttpResponse import java.time.Duration import java.util.* +import java.util.concurrent.Executors import java.util.concurrent.TimeUnit @@ -39,15 +46,10 @@ class PaymentExternalSystemAdapterImpl( private val rateLimitPerSec = properties.rateLimitPerSec private val parallelRequests = properties.parallelRequests - private val dispatcher = Dispatcher().apply { - maxRequestsPerHost = parallelRequests - maxRequests = parallelRequests * 2 - } - - private val client = OkHttpClient.Builder() - .dispatcher(dispatcher) - .connectionPool(ConnectionPool(parallelRequests, 6, TimeUnit.MINUTES)) - .callTimeout(remaining, TimeUnit.MILLISECONDS) + private val httpClient = HttpClient + .newBuilder() + .executor(Executors.newFixedThreadPool(parallelRequests)) + .version(HttpClient.Version.HTTP_2) .build() @@ -62,43 +64,28 @@ class PaymentExternalSystemAdapterImpl( } logger.info("[$accountName] Submit: $paymentId , txId: $transactionId") - val remaining = deadline - now() - if (remaining <= 0) { - paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, "deadline expired") - } - return + val remaining = deadline - now() + if (remaining <= 0) { + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "deadline expired") } - - val timeBeforeCall = now() - remaining.coerceAtMost(remaining) - tryAcquire(now(), remaining) - val request = Request.Builder() - .url( - "http://$paymentProviderHostPort/external/process" + - "?serviceName=$serviceName&token=$token&accountName=$accountName" + - "&transactionId=$transactionId&paymentId=$paymentId&amount=$amount" - ) - .post(emptyBody) - .build() - - logger.info("Client connections {}. Semaphore was locked: {} ", client.connectionPool.connectionCount(), parallelRequests-parallelLimiter.availablePermits) - client.newCall(request).enqueue(PaymentCallback( - startedRequests, - parallelLimiter, - accountName, - 0, - paymentId, - transactionId, - paymentESService, - client, - request, - timer, - deadline, - timeBeforeCall - )) + return } + val timeBeforeCall = now() + remaining.coerceAtMost(remaining) + tryAcquire(now(), remaining) + val request = HttpRequest.newBuilder() + .uri(URI("http://$paymentProviderHostPort/external/process?serviceName=$serviceName&token=$token&accountName=$accountName&transactionId=$transactionId&paymentId=$paymentId&amount=$amount")) + .timeout(Duration.ofMillis(requestAverageProcessingTime.toMillis())) + .POST(HttpRequest.BodyPublishers.noBody()) + .build() + + val retryCount = 0L; + + completeAction(retryCount, request, paymentId, transactionId, timeBeforeCall) + } + override fun price() = properties.price override fun isEnabled() = properties.enabled @@ -110,6 +97,73 @@ class PaymentExternalSystemAdapterImpl( return parallelLimiter.tryAcquire() } + fun completeAction(retryCount: Long, request: HttpRequest, paymentId: UUID, transactionId: UUID, timeBeforeCall: Long) { + httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()) + .whenComplete { response, throwable -> + if (throwable != null) { + val e = throwable.cause + when (throwable.cause) { + is SocketTimeoutException -> { + logger.warn("[$accountName] attempt ${retryCount + 1} timeout: $paymentId", e) + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "socket timeout") + } + } + + is InterruptedIOException -> { + logger.warn("[$accountName] interrupted: $paymentId", e) + + // Здесь мы обновляем состояние оплаты в зависимости от результата в базе данных оплат. + // Это требуется сделать ВО ВСЕХ ИСХОДАХ (успешная оплата / неуспешная / ошибочная ситуация) + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "interrupted IO") + } + } + + else -> { + logger.warn("[$accountName] io error: $paymentId", e) + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "io exception") + } + } + } + + if (retryCount + 1 >= 3) { + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "Max attempts reached") + } + startedRequests.increment() + parallelLimiter.release() + } else { + completeAction(retryCount + 1, request, paymentId, transactionId, timeBeforeCall) + } + } else { + startedRequests.increment() + try { + logger.warn("Free space in semaphore: {}", parallelLimiter.availablePermits) + logger.info( + "success in callback for payment: {}, retry count: {}, deadline: {}, in time: {}", + paymentId, + retryCount, + now() + ) + val rawBody = response.body() + val parsed = try { + mapper.readValue(rawBody, ExternalSysResponse::class.java) + } catch (ex: Exception) { + ExternalSysResponse(transactionId.toString(), paymentId.toString(), false, ex.message) + } + + paymentESService.update(paymentId) { + it.logProcessing(parsed.result, now(), transactionId, parsed.message) + } + } finally { + parallelLimiter.release() + timer.record(now() - timeBeforeCall, TimeUnit.MILLISECONDS) + } + } + } + } + } -} fun now() = System.currentTimeMillis() \ No newline at end of file From 9a87128fc15bfa7bd454632437f4be7365a6bb8e Mon Sep 17 00:00:00 2001 From: vaycheslav Date: Fri, 5 Dec 2025 17:02:43 +0300 Subject: [PATCH 7/9] fix: fixed timeouts --- .../ru/quipy/common/utils/SlidingWindowRateLimiter.kt | 4 ++-- .../ru/quipy/payments/logic/PaymentExternalServiceImpl.kt | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/main/kotlin/ru/quipy/common/utils/SlidingWindowRateLimiter.kt b/src/main/kotlin/ru/quipy/common/utils/SlidingWindowRateLimiter.kt index c5bd36a0a..4537b42a8 100644 --- a/src/main/kotlin/ru/quipy/common/utils/SlidingWindowRateLimiter.kt +++ b/src/main/kotlin/ru/quipy/common/utils/SlidingWindowRateLimiter.kt @@ -39,10 +39,10 @@ class SlidingWindowRateLimiter( } } - fun tickBlocking(timeout: Long): Boolean { + suspend fun tickBlocking(timeout: Long): Boolean { val timeStarted = System.currentTimeMillis() while (System.currentTimeMillis()-timeStarted < timeout && !tick()) { - Thread.sleep(2) + delay(2) } return System.currentTimeMillis()-timeStarted < timeout } diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt index 07b348927..92590fffd 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt @@ -36,7 +36,7 @@ class PaymentExternalSystemAdapterImpl( val mapper = ObjectMapper().registerKotlinModule() } - private val remaining = 50_000L + private val remaining = 20_000L // 2025-11-20T20:30:35.780+03:00 INFO 56644 --- [alhost:1234/...] ru.quipy.core.EventSourcingService : Optimistic lock exception. Failed to save event records id: [7dca693e-e811-4b7f-8bce-23e13d952c04-4] private val startedRequests = meterRegistry.counter("payment.processing.started", "accountName", properties.accountName) private val timer = meterRegistry.timer("payment.external.system.request.latency", "accountName", properties.accountName) @@ -73,11 +73,11 @@ class PaymentExternalSystemAdapterImpl( } val timeBeforeCall = now() - remaining.coerceAtMost(remaining) + remaining.coerceAtMost(this.remaining) tryAcquire(now(), remaining) val request = HttpRequest.newBuilder() .uri(URI("http://$paymentProviderHostPort/external/process?serviceName=$serviceName&token=$token&accountName=$accountName&transactionId=$transactionId&paymentId=$paymentId&amount=$amount")) - .timeout(Duration.ofMillis(requestAverageProcessingTime.toMillis())) + .timeout(Duration.ofMillis(30_000)) .POST(HttpRequest.BodyPublishers.noBody()) .build() @@ -142,7 +142,7 @@ class PaymentExternalSystemAdapterImpl( try { logger.warn("Free space in semaphore: {}", parallelLimiter.availablePermits) logger.info( - "success in callback for payment: {}, retry count: {}, deadline: {}, in time: {}", + "success in callback for payment: {}, retry count: {}, in time: {}", paymentId, retryCount, now() From b3517ee59408ae454d3db7404d1bb7769d259400 Mon Sep 17 00:00:00 2001 From: vaycheslav Date: Fri, 5 Dec 2025 17:40:30 +0300 Subject: [PATCH 8/9] fix: fixed timeouts and rate limiting --- .../ru/quipy/apigateway/APIController.kt | 4 --- .../ru/quipy/payments/logic/OrderPayer.kt | 20 +-------------- .../logic/PaymentExternalServiceImpl.kt | 25 ++++++++++++++----- 3 files changed, 20 insertions(+), 29 deletions(-) diff --git a/src/main/kotlin/ru/quipy/apigateway/APIController.kt b/src/main/kotlin/ru/quipy/apigateway/APIController.kt index 3720312da..e81d98b98 100644 --- a/src/main/kotlin/ru/quipy/apigateway/APIController.kt +++ b/src/main/kotlin/ru/quipy/apigateway/APIController.kt @@ -6,7 +6,6 @@ import org.springframework.beans.factory.annotation.Qualifier import org.springframework.web.bind.annotation.* import ru.quipy.common.utils.LeakingBucketRateLimiter import ru.quipy.common.utils.RateLimiter -import ru.quipy.exceptions.TooManyRequestsException import ru.quipy.orders.repository.OrderRepository import ru.quipy.payments.logic.OrderPayer import java.time.Duration @@ -59,9 +58,6 @@ class APIController( @PostMapping("/orders/{orderId}/payment") suspend fun payOrder(@PathVariable orderId: UUID, @RequestParam deadline: Long): PaymentSubmissionDto { - if (!rateLimiter.tick()) { - throw TooManyRequestsException(deadline) - } val paymentId = UUID.randomUUID() val order = orderRepository.findById(orderId)?.let { orderRepository.save(it.copy(status = OrderStatus.PAYMENT_IN_PROGRESS)) diff --git a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt index a929ed6be..8861311a1 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt @@ -5,18 +5,9 @@ import org.slf4j.Logger import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service -import ru.quipy.common.utils.CallerBlockingRejectedExecutionHandler -import ru.quipy.common.utils.NamedThreadFactory -import ru.quipy.common.utils.SlidingWindowRateLimiter import ru.quipy.core.EventSourcingService -import ru.quipy.exceptions.RateLimitWasBreached -import ru.quipy.exceptions.TooManyRequestsException import ru.quipy.payments.api.PaymentAggregate -import java.time.Duration import java.util.* -import java.util.concurrent.LinkedBlockingQueue -import java.util.concurrent.ThreadPoolExecutor -import java.util.concurrent.TimeUnit @Service class OrderPayer(meterRegistry: MeterRegistry) { @@ -24,12 +15,7 @@ class OrderPayer(meterRegistry: MeterRegistry) { companion object { val logger: Logger = LoggerFactory.getLogger(OrderPayer::class.java) } - private val rateLimit: SlidingWindowRateLimiter by lazy { - SlidingWindowRateLimiter( - rate = 1100, - window = Duration.ofMillis(1000), - ) - } + private val plannedRequests = meterRegistry.counter("payment.processing.planned", "accountName", "acc-12") @@ -43,10 +29,6 @@ class OrderPayer(meterRegistry: MeterRegistry) { suspend fun processPayment(orderId: UUID, amount: Int, paymentId: UUID, deadline: Long): Long { val createdAt = System.currentTimeMillis() plannedRequests.increment() - if (!rateLimit.tickBlocking(deadline- System.currentTimeMillis())) { - throw TooManyRequestsException(deadline) - } - val createdEvent = paymentESService.create { it.create( paymentId, diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt index 92590fffd..f4e209509 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt @@ -4,9 +4,10 @@ import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.registerKotlinModule import io.micrometer.core.instrument.MeterRegistry import kotlinx.coroutines.sync.Semaphore -import okhttp3.RequestBody import org.slf4j.LoggerFactory +import ru.quipy.common.utils.SlidingWindowRateLimiter import ru.quipy.core.EventSourcingService +import ru.quipy.exceptions.TooManyRequestsException import ru.quipy.payments.api.PaymentAggregate import java.io.InterruptedIOException import java.net.SocketTimeoutException @@ -31,8 +32,6 @@ class PaymentExternalSystemAdapterImpl( ) : PaymentExternalSystemAdapter { companion object { val logger = LoggerFactory.getLogger(PaymentExternalSystemAdapter::class.java) - - val emptyBody = RequestBody.create(null, ByteArray(0)) val mapper = ObjectMapper().registerKotlinModule() } @@ -46,6 +45,13 @@ class PaymentExternalSystemAdapterImpl( private val rateLimitPerSec = properties.rateLimitPerSec private val parallelRequests = properties.parallelRequests + private val rateLimit: SlidingWindowRateLimiter by lazy { + SlidingWindowRateLimiter( + rate = rateLimitPerSec.toLong(), + window = Duration.ofMillis(1000), + ) + } + private val httpClient = HttpClient .newBuilder() .executor(Executors.newFixedThreadPool(parallelRequests)) @@ -75,13 +81,16 @@ class PaymentExternalSystemAdapterImpl( val timeBeforeCall = now() remaining.coerceAtMost(this.remaining) tryAcquire(now(), remaining) + if (!rateLimit.tickBlocking(deadline- now())) { + throw TooManyRequestsException(deadline) + } val request = HttpRequest.newBuilder() .uri(URI("http://$paymentProviderHostPort/external/process?serviceName=$serviceName&token=$token&accountName=$accountName&transactionId=$transactionId&paymentId=$paymentId&amount=$amount")) .timeout(Duration.ofMillis(30_000)) .POST(HttpRequest.BodyPublishers.noBody()) .build() - val retryCount = 0L; + val retryCount = 0L completeAction(retryCount, request, paymentId, transactionId, timeBeforeCall) } @@ -93,8 +102,12 @@ class PaymentExternalSystemAdapterImpl( override fun name() = properties.accountName fun tryAcquire(startedAt: Long, remaining: Long): Boolean { - while (!parallelLimiter.tryAcquire() && now()-startedAt < remaining) { } - return parallelLimiter.tryAcquire() + var isAcquired = parallelLimiter.tryAcquire() + while (!isAcquired && now()-startedAt < remaining) { + isAcquired = parallelLimiter.tryAcquire() + } + + return isAcquired } fun completeAction(retryCount: Long, request: HttpRequest, paymentId: UUID, transactionId: UUID, timeBeforeCall: Long) { From f3a52b90326a290930213c36cca4e013303ab93a Mon Sep 17 00:00:00 2001 From: vaycheslav Date: Fri, 5 Dec 2025 20:40:57 +0300 Subject: [PATCH 9/9] fix: added backoff --- .../ru/quipy/apigateway/APIController.kt | 10 ++++--- .../logic/PaymentExternalServiceImpl.kt | 26 ++++++++++++++++--- 2 files changed, 30 insertions(+), 6 deletions(-) diff --git a/src/main/kotlin/ru/quipy/apigateway/APIController.kt b/src/main/kotlin/ru/quipy/apigateway/APIController.kt index e81d98b98..6a3ab4966 100644 --- a/src/main/kotlin/ru/quipy/apigateway/APIController.kt +++ b/src/main/kotlin/ru/quipy/apigateway/APIController.kt @@ -6,6 +6,7 @@ import org.springframework.beans.factory.annotation.Qualifier import org.springframework.web.bind.annotation.* import ru.quipy.common.utils.LeakingBucketRateLimiter import ru.quipy.common.utils.RateLimiter +import ru.quipy.exceptions.TooManyRequestsException import ru.quipy.orders.repository.OrderRepository import ru.quipy.payments.logic.OrderPayer import java.time.Duration @@ -16,13 +17,13 @@ class APIController( private val orderRepository: OrderRepository, private val orderPayer: OrderPayer, @field:Qualifier("parallelLimiter") - private val rateLimiter: RateLimiter = LeakingBucketRateLimiter(1100, Duration.ofSeconds(1), 4400) + private val rateLimiter: RateLimiter = LeakingBucketRateLimiter(1100, Duration.ofSeconds(1), 3300) ) { val logger: Logger = LoggerFactory.getLogger(APIController::class.java) @PostMapping("/users") - fun createUser(@RequestBody req: CreateUserRequest): User { + suspend fun createUser(@RequestBody req: CreateUserRequest): User { return User(UUID.randomUUID(), req.name) } @@ -31,7 +32,7 @@ class APIController( data class User(val id: UUID, val name: String) @PostMapping("/orders") - fun createOrder(@RequestParam userId: UUID, @RequestParam price: Int): Order { + suspend fun createOrder(@RequestParam userId: UUID, @RequestParam price: Int): Order { val order = Order( UUID.randomUUID(), userId, @@ -58,6 +59,9 @@ class APIController( @PostMapping("/orders/{orderId}/payment") suspend fun payOrder(@PathVariable orderId: UUID, @RequestParam deadline: Long): PaymentSubmissionDto { + if (!rateLimiter.tick()) { + throw TooManyRequestsException(deadline) + } val paymentId = UUID.randomUUID() val order = orderRepository.findById(orderId)?.let { orderRepository.save(it.copy(status = OrderStatus.PAYMENT_IN_PROGRESS)) diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt index f4e209509..0f9475fc2 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt @@ -2,7 +2,9 @@ package ru.quipy.payments.logic import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.registerKotlinModule +import com.github.dockerjava.api.model.Link import io.micrometer.core.instrument.MeterRegistry +import kotlinx.coroutines.delay import kotlinx.coroutines.sync.Semaphore import org.slf4j.LoggerFactory import ru.quipy.common.utils.SlidingWindowRateLimiter @@ -19,6 +21,7 @@ import java.time.Duration import java.util.* import java.util.concurrent.Executors import java.util.concurrent.TimeUnit +import kotlin.math.pow // Advice: always treat time as a Duration @@ -92,7 +95,7 @@ class PaymentExternalSystemAdapterImpl( val retryCount = 0L - completeAction(retryCount, request, paymentId, transactionId, timeBeforeCall) + completeAction(retryCount, request, paymentId, transactionId, timeBeforeCall, deadline) } override fun price() = properties.price @@ -110,7 +113,7 @@ class PaymentExternalSystemAdapterImpl( return isAcquired } - fun completeAction(retryCount: Long, request: HttpRequest, paymentId: UUID, transactionId: UUID, timeBeforeCall: Long) { + fun completeAction(retryCount: Long, request: HttpRequest, paymentId: UUID, transactionId: UUID, timeBeforeCall: Long, deadline: Long) { httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()) .whenComplete { response, throwable -> if (throwable != null) { @@ -148,7 +151,24 @@ class PaymentExternalSystemAdapterImpl( startedRequests.increment() parallelLimiter.release() } else { - completeAction(retryCount + 1, request, paymentId, transactionId, timeBeforeCall) + val backoff = ((2.0.pow(retryCount.toDouble()) * 25).toLong() + kotlin.random.Random.nextLong(10)) + val capped = backoff.coerceAtMost(deadline - now() - 5) + if (capped <= 0) { + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "Deadline expired") + } + startedRequests.increment() + } else { + Thread.sleep(backoff) + completeAction( + retryCount + 1, + request, + paymentId, + transactionId, + timeBeforeCall, + deadline + ) + } } } else { startedRequests.increment()