diff --git a/.gitignore b/.gitignore index ea59400a3..22faa66a0 100644 --- a/.gitignore +++ b/.gitignore @@ -32,4 +32,7 @@ build/ !**/src/test/**/build/ ### VS Code ### -.vscode/ \ No newline at end of file +.vscode/ + +grafana/ +prometheus/ \ No newline at end of file diff --git a/pom.xml b/pom.xml index 554b0bdcd..069fa29ad 100644 --- a/pom.xml +++ b/pom.xml @@ -31,6 +31,11 @@ resilience4j-ratelimiter ${resilience4jVersion} + + io.github.resilience4j + resilience4j-circuitbreaker + ${resilience4jVersion} + org.jetbrains.kotlinx kotlinx-coroutines-core diff --git a/src/main/kotlin/ru/quipy/payments/config/PaymentAccountsConfig.kt b/src/main/kotlin/ru/quipy/payments/config/PaymentAccountsConfig.kt index 97b0bf95c..0cbd4568b 100644 --- a/src/main/kotlin/ru/quipy/payments/config/PaymentAccountsConfig.kt +++ b/src/main/kotlin/ru/quipy/payments/config/PaymentAccountsConfig.kt @@ -152,13 +152,14 @@ class PaymentAccountsConfig { .onEach(::println) .map { PaymentExternalSystemAdapterImpl( - it, - paymentService, - paymentProviderHostPort, - token, - meterRegistry, - parallelLimiter(accountProperties), - rateLimiter = smoothOutIncoming(accountProperties) + properties = it, + paymentESService = paymentService, + paymentProviderHostPort = paymentProviderHostPort, + token = token, + meterRegistry = meterRegistry, + parallelLimiter = parallelLimiter(accountProperties), + rateLimiter = smoothOutIncoming(accountProperties), + circuitBreaker = PaymentCircuitBreakerFactory.forAccount(accountProperties.first()) ) } } diff --git a/src/main/kotlin/ru/quipy/payments/config/PaymentCircuitBreakerFactory.kt b/src/main/kotlin/ru/quipy/payments/config/PaymentCircuitBreakerFactory.kt new file mode 100644 index 000000000..ccf02c638 --- /dev/null +++ b/src/main/kotlin/ru/quipy/payments/config/PaymentCircuitBreakerFactory.kt @@ -0,0 +1,28 @@ +package ru.quipy.payments.config + +import io.github.resilience4j.circuitbreaker.CircuitBreaker +import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig +import ru.quipy.payments.logic.PaymentAccountProperties +import java.time.Duration + +object PaymentCircuitBreakerFactory { + + fun forAccount(props: PaymentAccountProperties): CircuitBreaker { + val avgMs = props.averageProcessingTime.toMillis().coerceAtLeast(1L) + val slowThreshold = Duration.ofMillis((avgMs * 5).coerceAtMost(3000L)) + + val config = CircuitBreakerConfig.custom() + .slidingWindowType(CircuitBreakerConfig.SlidingWindowType.COUNT_BASED) + .slidingWindowSize(20) + .minimumNumberOfCalls(5) + .failureRateThreshold(50f) + .slowCallRateThreshold(80f) + .slowCallDurationThreshold(slowThreshold) + .waitDurationInOpenState(Duration.ofSeconds(3)) + .permittedNumberOfCallsInHalfOpenState(5) + .automaticTransitionFromOpenToHalfOpenEnabled(true) + .build() + + return CircuitBreaker.of("cb-${props.accountName}", config) + } +} \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt index 9d9440f48..18294359d 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt @@ -11,6 +11,7 @@ import ru.quipy.common.utils.SlidingWindowRateLimiter import ru.quipy.core.EventSourcingService import ru.quipy.payments.api.PaymentAggregate import java.util.* +import java.util.concurrent.CompletableFuture import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.TimeUnit @@ -42,28 +43,31 @@ class OrderPayer( fun processPayment(orderId: UUID, amount: Int, paymentId: UUID, deadline: Long): Long { val createdAt = System.currentTimeMillis() plannedCounter.increment() - paymentExecutor.submit { - startedCounter.increment() - try { - val esFuture = java.util.concurrent.CompletableFuture.runAsync { - try { - paymentESService.create { - it.create( - paymentId, - orderId, - amount - ) - } - } catch (e: Exception) { - logger.warn("Failed to create payment ES event for $paymentId", e) + + CompletableFuture + .runAsync( + { + startedCounter.increment() + paymentESService.create { + it.create(paymentId, orderId, amount) } + }, + paymentExecutor, + ) + .thenRunAsync( + { + inExecTimer.record(now() - createdAt, TimeUnit.MILLISECONDS) + paymentService.submitPaymentRequest(paymentId, amount, createdAt, deadline) + }, + paymentExecutor, + ) + .whenComplete { _, ex -> + if (ex != null) { + logger.warn("process stopped before submit for payment: $paymentId", ex) } - inExecTimer.record(now()-createdAt, TimeUnit.MILLISECONDS) - paymentService.submitPaymentRequest(paymentId, amount, createdAt, deadline) - } finally { completedCounter.increment() } - } + return createdAt } } \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt index 6590e20c7..fe4ed0cac 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt @@ -2,6 +2,7 @@ package ru.quipy.payments.logic import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.registerKotlinModule +import io.github.resilience4j.circuitbreaker.CircuitBreaker import io.micrometer.core.instrument.MeterRegistry import io.micrometer.core.instrument.Tag import kotlinx.coroutines.CoroutineScope @@ -41,7 +42,8 @@ class PaymentExternalSystemAdapterImpl( private val token: String, meterRegistry: MeterRegistry, private val parallelLimiter: Semaphore, - private val rateLimiter: SlidingWindowRateLimiter + private val rateLimiter: SlidingWindowRateLimiter, + private val circuitBreaker: CircuitBreaker, ) : PaymentExternalSystemAdapter { private val serviceName = properties.serviceName @@ -80,24 +82,6 @@ class PaymentExternalSystemAdapterImpl( listOf(Tag.of("accountName", properties.accountName)), retryRequests ) { it.toDouble() } - - try { - val warmupUri = URI("http://$paymentProviderHostPort/external/accounts?serviceName=$serviceName&token=$token") - val warmupRequest = HttpRequest.newBuilder() - .uri(warmupUri) - .timeout(Duration.ofSeconds(5)) - .GET() - .build() - val futures = (1..10).map { - httpClient.sendAsync(warmupRequest, HttpResponse.BodyHandlers.ofString()) - } - futures.forEach { future -> - try { future.join() } catch (_: Exception) {} - } - logger.info("[$accountName] HTTP client warmup complete") - } catch (e: Exception) { - logger.warn("[$accountName] HTTP warmup failed (non-fatal)", e) - } } @@ -131,6 +115,17 @@ class PaymentExternalSystemAdapterImpl( throw TooManyRequestsException(deadline) } + if (!circuitBreaker.tryAcquirePermission()) { + try { + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "Circuit breaker rejected (no permission)") + } + } catch (e: Exception) { + logger.error("accountName: $accountName. Failed to record CB reject for $paymentId", e) + } + return + } + parallelLimiter.acquire() sendAttempt(0, request, paymentId, transactionId, deadline, completed, allowHedge = true) @@ -155,7 +150,13 @@ class PaymentExternalSystemAdapterImpl( return@schedule } + if (!circuitBreaker.tryAcquirePermission()) { + logger.debug("accountName: $accountName. Hedge skipped: CB no permission for $paymentId") + return@schedule + } + parallelLimiter.acquire() + sendAttempt( 0, hedgeRequest, @@ -200,7 +201,34 @@ class PaymentExternalSystemAdapterImpl( if (retryCount > 0) { retryRequests.decrementAndGet() } - timer.record(now() - timeBeforeCall, TimeUnit.MILLISECONDS) + + val callDuration = now() - timeBeforeCall + timer.record(callDuration, TimeUnit.MILLISECONDS) + + if (throwable != null) { + circuitBreaker.onError(callDuration, TimeUnit.MILLISECONDS, throwable) + } else { + try { + val rawBody = response.body() + val parsed = try { + mapper.readValue(rawBody, ExternalSysResponse::class.java) + } catch (ex: Exception) { + ExternalSysResponse(transactionId.toString(), paymentId.toString(), false, ex.message) + } + + if (parsed.result) { + circuitBreaker.onSuccess(callDuration, TimeUnit.MILLISECONDS) + } else { + circuitBreaker.onError( + callDuration, + TimeUnit.MILLISECONDS, + RuntimeException(parsed.message ?: "Payment rejected"), + ) + } + } catch (e: Exception) { + circuitBreaker.onError(callDuration, TimeUnit.MILLISECONDS, e) + } + } if (completed.get()) { return@whenComplete @@ -338,6 +366,12 @@ class PaymentExternalSystemAdapterImpl( // } return@schedule } + + if (!circuitBreaker.tryAcquirePermission()) { + logger.debug("accountName: $accountName. Retry skipped: CB no permission for $paymentId") + return@schedule + } + parallelLimiter.acquire() sendAttempt( retryCount + 1,