diff --git a/src/main/kotlin/ru/quipy/OnlineShopApplication.kt b/src/main/kotlin/ru/quipy/OnlineShopApplication.kt index 9ac22e094..1fcd3f89c 100644 --- a/src/main/kotlin/ru/quipy/OnlineShopApplication.kt +++ b/src/main/kotlin/ru/quipy/OnlineShopApplication.kt @@ -1,5 +1,6 @@ package ru.quipy +import kotlinx.coroutines.asCoroutineDispatcher import org.slf4j.Logger import org.slf4j.LoggerFactory import org.springframework.boot.autoconfigure.SpringBootApplication @@ -13,10 +14,10 @@ class OnlineShopApplication { val log: Logger = LoggerFactory.getLogger(OnlineShopApplication::class.java) companion object { - val appExecutor = Executors.newFixedThreadPool(64, NamedThreadFactory("main-app-executor")) + val appExecutor = Executors.newFixedThreadPool(20_000, NamedThreadFactory("main-app-executor")).asCoroutineDispatcher() } } -fun main(args: Array) { +suspend fun main(args: Array) { runApplication(*args) } diff --git a/src/main/kotlin/ru/quipy/apigateway/APIController.kt b/src/main/kotlin/ru/quipy/apigateway/APIController.kt index c07d96b7a..6a3ab4966 100644 --- a/src/main/kotlin/ru/quipy/apigateway/APIController.kt +++ b/src/main/kotlin/ru/quipy/apigateway/APIController.kt @@ -17,13 +17,13 @@ class APIController( private val orderRepository: OrderRepository, private val orderPayer: OrderPayer, @field:Qualifier("parallelLimiter") - private val rateLimiter: RateLimiter = LeakingBucketRateLimiter(8, Duration.ofSeconds(1), 30) + private val rateLimiter: RateLimiter = LeakingBucketRateLimiter(1100, Duration.ofSeconds(1), 3300) ) { val logger: Logger = LoggerFactory.getLogger(APIController::class.java) @PostMapping("/users") - fun createUser(@RequestBody req: CreateUserRequest): User { + suspend fun createUser(@RequestBody req: CreateUserRequest): User { return User(UUID.randomUUID(), req.name) } @@ -32,7 +32,7 @@ class APIController( data class User(val id: UUID, val name: String) @PostMapping("/orders") - fun createOrder(@RequestParam userId: UUID, @RequestParam price: Int): Order { + suspend fun createOrder(@RequestParam userId: UUID, @RequestParam price: Int): Order { val order = Order( UUID.randomUUID(), userId, @@ -58,7 +58,7 @@ class APIController( } @PostMapping("/orders/{orderId}/payment") - fun payOrder(@PathVariable orderId: UUID, @RequestParam deadline: Long): PaymentSubmissionDto { + suspend fun payOrder(@PathVariable orderId: UUID, @RequestParam deadline: Long): PaymentSubmissionDto { if (!rateLimiter.tick()) { throw TooManyRequestsException(deadline) } diff --git a/src/main/kotlin/ru/quipy/common/utils/SlidingWindowRateLimiter.kt b/src/main/kotlin/ru/quipy/common/utils/SlidingWindowRateLimiter.kt index c5bd36a0a..4537b42a8 100644 --- a/src/main/kotlin/ru/quipy/common/utils/SlidingWindowRateLimiter.kt +++ b/src/main/kotlin/ru/quipy/common/utils/SlidingWindowRateLimiter.kt @@ -39,10 +39,10 @@ class SlidingWindowRateLimiter( } } - fun tickBlocking(timeout: Long): Boolean { + suspend fun tickBlocking(timeout: Long): Boolean { val timeStarted = System.currentTimeMillis() while (System.currentTimeMillis()-timeStarted < timeout && !tick()) { - Thread.sleep(2) + delay(2) } return System.currentTimeMillis()-timeStarted < timeout } diff --git a/src/main/kotlin/ru/quipy/orders/subscribers/PaymentSubscriber.kt b/src/main/kotlin/ru/quipy/orders/subscribers/PaymentSubscriber.kt index a4c312eaf..0b7560804 100644 --- a/src/main/kotlin/ru/quipy/orders/subscribers/PaymentSubscriber.kt +++ b/src/main/kotlin/ru/quipy/orders/subscribers/PaymentSubscriber.kt @@ -36,7 +36,7 @@ class PaymentSubscriber { retryConf = RetryConf(1, RetryFailedStrategy.SKIP_EVENT) ) { `when`(PaymentProcessedEvent::class) { event -> - appExecutor.submit { + appExecutor.run { logger.trace( "Payment results. OrderId ${event.orderId}, succeeded: ${event.success}, txId: ${event.transactionId}, reason: ${event.reason}, duration: ${ Duration.ofMillis( diff --git a/src/main/kotlin/ru/quipy/payments/config/PaymentAccountsConfig.kt b/src/main/kotlin/ru/quipy/payments/config/PaymentAccountsConfig.kt index e10bd805a..8012d1ee7 100644 --- a/src/main/kotlin/ru/quipy/payments/config/PaymentAccountsConfig.kt +++ b/src/main/kotlin/ru/quipy/payments/config/PaymentAccountsConfig.kt @@ -4,6 +4,7 @@ import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule import com.fasterxml.jackson.module.kotlin.registerKotlinModule import io.micrometer.core.instrument.MeterRegistry +import kotlinx.coroutines.sync.Semaphore import org.springframework.beans.factory.annotation.Value import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration @@ -18,7 +19,6 @@ import java.net.http.HttpClient import java.net.http.HttpRequest import java.net.http.HttpResponse import java.util.* -import java.util.concurrent.Semaphore @Configuration diff --git a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt index 359aa0a59..8861311a1 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt @@ -1,34 +1,23 @@ package ru.quipy.payments.logic +import io.micrometer.core.instrument.MeterRegistry import org.slf4j.Logger import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service -import ru.quipy.common.utils.CallerBlockingRejectedExecutionHandler -import ru.quipy.common.utils.NamedThreadFactory -import ru.quipy.common.utils.SlidingWindowRateLimiter import ru.quipy.core.EventSourcingService -import ru.quipy.exceptions.RateLimitWasBreached -import ru.quipy.exceptions.TooManyRequestsException import ru.quipy.payments.api.PaymentAggregate -import java.time.Duration import java.util.* -import java.util.concurrent.LinkedBlockingQueue -import java.util.concurrent.ThreadPoolExecutor -import java.util.concurrent.TimeUnit @Service -class OrderPayer { +class OrderPayer(meterRegistry: MeterRegistry) { companion object { val logger: Logger = LoggerFactory.getLogger(OrderPayer::class.java) } - private val rateLimit: SlidingWindowRateLimiter by lazy { - SlidingWindowRateLimiter( - rate = 8, - window = Duration.ofMillis(1000), - ) - } + + private val plannedRequests = meterRegistry.counter("payment.processing.planned", "accountName", "acc-12") + @Autowired private lateinit var paymentESService: EventSourcingService @@ -36,33 +25,22 @@ class OrderPayer { @Autowired private lateinit var paymentService: PaymentService - private val paymentExecutor = ThreadPoolExecutor( - 16, - 16, - 0, - TimeUnit.MILLISECONDS, - LinkedBlockingQueue(8_000), - NamedThreadFactory("payment-submission-executor"), - CallerBlockingRejectedExecutionHandler() - ) - fun processPayment(orderId: UUID, amount: Int, paymentId: UUID, deadline: Long): Long { + suspend fun processPayment(orderId: UUID, amount: Int, paymentId: UUID, deadline: Long): Long { val createdAt = System.currentTimeMillis() - if (!rateLimit.tickBlocking(deadline- System.currentTimeMillis())) { - throw TooManyRequestsException(deadline) - } - paymentExecutor.submit { - val createdEvent = paymentESService.create { + plannedRequests.increment() + val createdEvent = paymentESService.create { it.create( paymentId, orderId, amount ) } - logger.trace("Payment ${createdEvent.paymentId} for order $orderId created.") - paymentService.submitPaymentRequest(paymentId, amount, createdAt, deadline) - } + logger.trace("Payment {} for order {} created.", createdEvent.paymentId, orderId) + + paymentService.submitPaymentRequest(paymentId, amount, createdAt, deadline) + return createdAt } } \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentCallback.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentCallback.kt new file mode 100644 index 000000000..87e6f4103 --- /dev/null +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentCallback.kt @@ -0,0 +1,102 @@ +package ru.quipy.payments.logic + +import io.micrometer.core.instrument.Timer +import io.micrometer.core.instrument.Counter +import kotlinx.coroutines.sync.Semaphore +import okhttp3.Call +import okhttp3.Callback +import okhttp3.OkHttpClient +import okhttp3.Request +import okhttp3.Response +import ru.quipy.core.EventSourcingService +import ru.quipy.payments.api.PaymentAggregate +import ru.quipy.payments.logic.PaymentExternalSystemAdapterImpl.Companion.logger +import ru.quipy.payments.logic.PaymentExternalSystemAdapterImpl.Companion.mapper +import java.io.InterruptedIOException +import java.net.SocketTimeoutException +import java.util.UUID +import java.util.concurrent.TimeUnit +import kotlin.math.pow + + +class PaymentCallback(val startedRequestsCounter: Counter, val semaphore: Semaphore, val accountName: String, val retryCount: Int, val paymentId: UUID, val transactionId: UUID, val paymentESService: EventSourcingService, val client: OkHttpClient, val request: Request, val timer: Timer, val deadline: Long, val timeBeforeCall: Long) : Callback { + override fun onFailure(call: Call, e: java.io.IOException) { + + logger.debug("fail in callback for payment: {}, retry count: {}, deadline: {}, in time: {}", paymentId, retryCount, deadline, now(), e) + when (e) { + is SocketTimeoutException -> { + logger.warn("[$accountName] attempt ${retryCount + 1} timeout: $paymentId", e) + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "socket timeout") + } + } + + is InterruptedIOException -> { + logger.warn("[$accountName] interrupted: $paymentId", e) + + // Здесь мы обновляем состояние оплаты в зависимости от результата в базе данных оплат. + // Это требуется сделать ВО ВСЕХ ИСХОДАХ (успешная оплата / неуспешная / ошибочная ситуация) + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "interrupted IO") + } + } + + else -> { + logger.warn("[$accountName] io error: $paymentId", e) + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "io exception") + } + } + } + + if (retryCount + 1 >= 3) { + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "Max attempts reached") + } + startedRequestsCounter.increment() + semaphore.release() + return + } + + val backoff = ((2.0.pow(retryCount.toDouble()) * 25).toLong() + kotlin.random.Random.nextLong(10)) + val capped = backoff.coerceAtMost(deadline - now() - 5) + if (capped <= 0) { + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "Deadline expired") + } + startedRequestsCounter.increment() + semaphore.release() + return + } + val nCall = client.newCall(request) + nCall.enqueue(PaymentCallback(startedRequestsCounter, semaphore, accountName, retryCount + 1, paymentId, transactionId, paymentESService, client, request, timer, deadline, now())) + timer.record(now() - timeBeforeCall, TimeUnit.MILLISECONDS) + } + + override fun onResponse(call: Call, response: Response) { + startedRequestsCounter.increment() + try { + logger.warn("Free space in semaphore: {}", semaphore.availablePermits) + logger.info( + "success in callback for payment: {}, retry count: {}, deadline: {}, in time: {}", + paymentId, + retryCount, + deadline, + now() + ) + val rawBody = response.body?.string() + val parsed = try { + mapper.readValue(rawBody, ExternalSysResponse::class.java) + } catch (ex: Exception) { + ExternalSysResponse(transactionId.toString(), paymentId.toString(), false, ex.message) + } + + paymentESService.update(paymentId) { + it.logProcessing(parsed.result, now(), transactionId, parsed.message) + } + } finally { + semaphore.release() + timer.record(now() - timeBeforeCall, TimeUnit.MILLISECONDS) + } + } +} \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt index f0e84098c..0f9475fc2 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt @@ -2,19 +2,24 @@ package ru.quipy.payments.logic import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.registerKotlinModule +import com.github.dockerjava.api.model.Link import io.micrometer.core.instrument.MeterRegistry -import okhttp3.OkHttpClient -import okhttp3.Request -import okhttp3.RequestBody -import okio.IOException +import kotlinx.coroutines.delay +import kotlinx.coroutines.sync.Semaphore import org.slf4j.LoggerFactory +import ru.quipy.common.utils.SlidingWindowRateLimiter import ru.quipy.core.EventSourcingService +import ru.quipy.exceptions.TooManyRequestsException import ru.quipy.payments.api.PaymentAggregate import java.io.InterruptedIOException import java.net.SocketTimeoutException +import java.net.URI +import java.net.http.HttpClient +import java.net.http.HttpRequest +import java.net.http.HttpResponse import java.time.Duration import java.util.* -import java.util.concurrent.Semaphore +import java.util.concurrent.Executors import java.util.concurrent.TimeUnit import kotlin.math.pow @@ -28,14 +33,14 @@ class PaymentExternalSystemAdapterImpl( meterRegistry: MeterRegistry, private val parallelLimiter: Semaphore ) : PaymentExternalSystemAdapter { - companion object { val logger = LoggerFactory.getLogger(PaymentExternalSystemAdapter::class.java) - - val emptyBody = RequestBody.create(null, ByteArray(0)) val mapper = ObjectMapper().registerKotlinModule() } + private val remaining = 20_000L + // 2025-11-20T20:30:35.780+03:00 INFO 56644 --- [alhost:1234/...] ru.quipy.core.EventSourcingService : Optimistic lock exception. Failed to save event records id: [7dca693e-e811-4b7f-8bce-23e13d952c04-4] + private val startedRequests = meterRegistry.counter("payment.processing.started", "accountName", properties.accountName) private val timer = meterRegistry.timer("payment.external.system.request.latency", "accountName", properties.accountName) private val serviceName = properties.serviceName private val accountName = properties.accountName @@ -43,12 +48,22 @@ class PaymentExternalSystemAdapterImpl( private val rateLimitPerSec = properties.rateLimitPerSec private val parallelRequests = properties.parallelRequests - private val client = OkHttpClient.Builder() - .callTimeout(1100, TimeUnit.MILLISECONDS).build() + private val rateLimit: SlidingWindowRateLimiter by lazy { + SlidingWindowRateLimiter( + rate = rateLimitPerSec.toLong(), + window = Duration.ofMillis(1000), + ) + } + + private val httpClient = HttpClient + .newBuilder() + .executor(Executors.newFixedThreadPool(parallelRequests)) + .version(HttpClient.Version.HTTP_2) + .build() - override fun performPaymentAsync(paymentId: UUID, amount: Int, paymentStartedAt: Long, deadline: Long) { - logger.warn("[$accountName] Submitting payment request for payment $paymentId") + override suspend fun performPaymentAsync(paymentId: UUID, amount: Int, paymentStartedAt: Long, deadline: Long) { + logger.warn("[$accountName] Submitting payment request for payment $paymentId") val transactionId = UUID.randomUUID() // Вне зависимости от исхода оплаты важно отметить что она была отправлена. @@ -58,121 +73,130 @@ class PaymentExternalSystemAdapterImpl( } logger.info("[$accountName] Submit: $paymentId , txId: $transactionId") - - var retryCount = 0 - while (true) { - val remaining = deadline - now() - if (remaining <= 0) { - paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, "deadline expired") - } - return + val remaining = deadline - now() + if (remaining <= 0) { + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "deadline expired") } + return + } - if (!parallelLimiter.tryAcquire(remaining, TimeUnit.MILLISECONDS)) { - paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, "parallel limiter timeout") - } - return - } + val timeBeforeCall = now() + remaining.coerceAtMost(this.remaining) + tryAcquire(now(), remaining) + if (!rateLimit.tickBlocking(deadline- now())) { + throw TooManyRequestsException(deadline) + } + val request = HttpRequest.newBuilder() + .uri(URI("http://$paymentProviderHostPort/external/process?serviceName=$serviceName&token=$token&accountName=$accountName&transactionId=$transactionId&paymentId=$paymentId&amount=$amount")) + .timeout(Duration.ofMillis(30_000)) + .POST(HttpRequest.BodyPublishers.noBody()) + .build() - val timeBeforeCall = now() - var shouldRetry = false - try { - val perCallTimeoutMs = remaining.coerceAtMost(1100) - val request = Request.Builder() - .url( - "http://$paymentProviderHostPort/external/process" + - "?serviceName=$serviceName&token=$token&accountName=$accountName" + - "&transactionId=$transactionId&paymentId=$paymentId&amount=$amount" - ) - .post(emptyBody) - .build() - - val call = client.newCall(request) - call.timeout().timeout(perCallTimeoutMs, TimeUnit.MILLISECONDS) - - call.execute().use { response -> - val rawBody = response.body?.string() - val parsed = try { - mapper.readValue(rawBody, ExternalSysResponse::class.java) - } catch (ex: Exception) { - ExternalSysResponse(transactionId.toString(), paymentId.toString(), false, ex.message) - } + val retryCount = 0L - shouldRetry = !parsed.result && (response.code == 429 || response.code >= 500) + completeAction(retryCount, request, paymentId, transactionId, timeBeforeCall, deadline) + } - paymentESService.update(paymentId) { - it.logProcessing(parsed.result, now(), transactionId, parsed.message) - } + override fun price() = properties.price - if (parsed.result) { - return - } - } - } catch (e: SocketTimeoutException) { - logger.warn("[$accountName] attempt ${retryCount + 1} timeout: $paymentId", e) - shouldRetry = true - paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, "socket timeout") - } - } catch (e: InterruptedIOException) { - logger.warn("[$accountName] interrupted: $paymentId", e) - shouldRetry = true - - // Здесь мы обновляем состояние оплаты в зависимости от результата в базе данных оплат. - // Это требуется сделать ВО ВСЕХ ИСХОДАХ (успешная оплата / неуспешная / ошибочная ситуация) - paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, "interrupted IO") - } - } catch (e: IOException) { - logger.warn("[$accountName] io error: $paymentId", e) - shouldRetry = true - paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, "io exception") - } - } catch (e: Exception) { - logger.error("[$accountName] non-retriable error: $paymentId", e) - shouldRetry = false - paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, e.message) - } - } finally { - timer.record(now() - timeBeforeCall, TimeUnit.MILLISECONDS) - parallelLimiter.release() - } + override fun isEnabled() = properties.enabled - if (!shouldRetry) { - return - } + override fun name() = properties.accountName - retryCount++ - if (retryCount >= 3) { - paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, "Max attempts reached") - } - return - } + fun tryAcquire(startedAt: Long, remaining: Long): Boolean { + var isAcquired = parallelLimiter.tryAcquire() + while (!isAcquired && now()-startedAt < remaining) { + isAcquired = parallelLimiter.tryAcquire() + } - val backoff = ((2.0.pow(retryCount.toDouble()) * 25).toLong() + kotlin.random.Random.nextLong(10)) - val capped = backoff.coerceAtMost(deadline - now() - 5) - if (capped <= 0) { - paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, "Deadline expired") + return isAcquired + } + + fun completeAction(retryCount: Long, request: HttpRequest, paymentId: UUID, transactionId: UUID, timeBeforeCall: Long, deadline: Long) { + httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()) + .whenComplete { response, throwable -> + if (throwable != null) { + val e = throwable.cause + when (throwable.cause) { + is SocketTimeoutException -> { + logger.warn("[$accountName] attempt ${retryCount + 1} timeout: $paymentId", e) + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "socket timeout") + } + } + + is InterruptedIOException -> { + logger.warn("[$accountName] interrupted: $paymentId", e) + + // Здесь мы обновляем состояние оплаты в зависимости от результата в базе данных оплат. + // Это требуется сделать ВО ВСЕХ ИСХОДАХ (успешная оплата / неуспешная / ошибочная ситуация) + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "interrupted IO") + } + } + + else -> { + logger.warn("[$accountName] io error: $paymentId", e) + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "io exception") + } + } + } + + if (retryCount + 1 >= 3) { + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "Max attempts reached") + } + startedRequests.increment() + parallelLimiter.release() + } else { + val backoff = ((2.0.pow(retryCount.toDouble()) * 25).toLong() + kotlin.random.Random.nextLong(10)) + val capped = backoff.coerceAtMost(deadline - now() - 5) + if (capped <= 0) { + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "Deadline expired") + } + startedRequests.increment() + } else { + Thread.sleep(backoff) + completeAction( + retryCount + 1, + request, + paymentId, + transactionId, + timeBeforeCall, + deadline + ) + } + } + } else { + startedRequests.increment() + try { + logger.warn("Free space in semaphore: {}", parallelLimiter.availablePermits) + logger.info( + "success in callback for payment: {}, retry count: {}, in time: {}", + paymentId, + retryCount, + now() + ) + val rawBody = response.body() + val parsed = try { + mapper.readValue(rawBody, ExternalSysResponse::class.java) + } catch (ex: Exception) { + ExternalSysResponse(transactionId.toString(), paymentId.toString(), false, ex.message) + } + + paymentESService.update(paymentId) { + it.logProcessing(parsed.result, now(), transactionId, parsed.message) + } + } finally { + parallelLimiter.release() + timer.record(now() - timeBeforeCall, TimeUnit.MILLISECONDS) + } + } } - return } - Thread.sleep(capped) - } } - override fun price() = properties.price - - override fun isEnabled() = properties.enabled - - override fun name() = properties.accountName - fun timeToDead(deadline: Long): Long { - return deadline - now() - } -} -public fun now() = System.currentTimeMillis() \ No newline at end of file +fun now() = System.currentTimeMillis() \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentService.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentService.kt index 255db77dd..d67aebbf0 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentService.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentService.kt @@ -7,7 +7,7 @@ interface PaymentService { /** * Submit payment request to some external service. */ - fun submitPaymentRequest(paymentId: UUID, amount: Int, paymentStartedAt: Long, deadline: Long) + suspend fun submitPaymentRequest(paymentId: UUID, amount: Int, paymentStartedAt: Long, deadline: Long) } /** @@ -17,7 +17,7 @@ interface PaymentService { */ interface PaymentExternalSystemAdapter { - fun performPaymentAsync(paymentId: UUID, amount: Int, paymentStartedAt: Long, deadline: Long) + suspend fun performPaymentAsync(paymentId: UUID, amount: Int, paymentStartedAt: Long, deadline: Long) fun name(): String diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentServiceImpl.kt index 1c24e5a72..3fb881462 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentServiceImpl.kt @@ -1,16 +1,8 @@ package ru.quipy.payments.logic import org.slf4j.LoggerFactory -import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service -import ru.quipy.common.utils.NamedThreadFactory -import ru.quipy.core.EventSourcingService -import ru.quipy.payments.api.PaymentAggregate -import java.time.Duration import java.util.* -import java.util.concurrent.Executors -import java.util.concurrent.locks.ReentrantLock -import kotlin.concurrent.withLock @Service @@ -21,7 +13,7 @@ class PaymentSystemImpl( val logger = LoggerFactory.getLogger(PaymentSystemImpl::class.java) } - override fun submitPaymentRequest(paymentId: UUID, amount: Int, paymentStartedAt: Long, deadline: Long) { + override suspend fun submitPaymentRequest(paymentId: UUID, amount: Int, paymentStartedAt: Long, deadline: Long) { for (account in paymentAccounts) { account.performPaymentAsync(paymentId, amount, paymentStartedAt, deadline) } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 3170b6dad..bdbe25f61 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -15,7 +15,7 @@ event.sourcing.sagas-enabled=false spring.datasource.hikari.jdbc-url=jdbc:postgresql://${POSTGRES_ADDRESS:localhost}:${POSTGRES_PORT:65432}/postgres spring.datasource.hikari.username=tiny_es spring.datasource.hikari.password=tiny_es - +logger.level.ru.quipy.payments.logic.PaymentCallback=DEBUG spring.datasource.hikari.leak-detection-threshold=2000 management.metrics.web.server.request.autotime.percentiles=0.95 management.metrics.export.prometheus.enabled=true @@ -26,6 +26,7 @@ payment.service-name=${PAYMENT_SERVICE_NAME} payment.token=${PAYMENT_TOKEN} # payment.accounts=${PAYMENT_ACCOUNTS:acc-12,acc-20} # payment.accounts=${PAYMENT_ACCOUNTS:acc-3} -payment.accounts=${PAYMENT_ACCOUNTS:acc-7} +payment.accounts=${PAYMENT_ACCOUNTS:acc-12} # payment.accounts=${PAYMENT_ACCOUNTS:acc-18} payment.hostPort=${PAYMENT_HOST:localhost}:${PAYMENT_PORT:1234} +