diff --git a/.gitignore b/.gitignore
index ea59400a3..22faa66a0 100644
--- a/.gitignore
+++ b/.gitignore
@@ -32,4 +32,7 @@ build/
!**/src/test/**/build/
### VS Code ###
-.vscode/
\ No newline at end of file
+.vscode/
+
+grafana/
+prometheus/
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 554b0bdcd..069fa29ad 100644
--- a/pom.xml
+++ b/pom.xml
@@ -31,6 +31,11 @@
resilience4j-ratelimiter
${resilience4jVersion}
+
+ io.github.resilience4j
+ resilience4j-circuitbreaker
+ ${resilience4jVersion}
+
org.jetbrains.kotlinx
kotlinx-coroutines-core
diff --git a/src/main/kotlin/ru/quipy/payments/config/PaymentAccountsConfig.kt b/src/main/kotlin/ru/quipy/payments/config/PaymentAccountsConfig.kt
index 97b0bf95c..0cbd4568b 100644
--- a/src/main/kotlin/ru/quipy/payments/config/PaymentAccountsConfig.kt
+++ b/src/main/kotlin/ru/quipy/payments/config/PaymentAccountsConfig.kt
@@ -152,13 +152,14 @@ class PaymentAccountsConfig {
.onEach(::println)
.map {
PaymentExternalSystemAdapterImpl(
- it,
- paymentService,
- paymentProviderHostPort,
- token,
- meterRegistry,
- parallelLimiter(accountProperties),
- rateLimiter = smoothOutIncoming(accountProperties)
+ properties = it,
+ paymentESService = paymentService,
+ paymentProviderHostPort = paymentProviderHostPort,
+ token = token,
+ meterRegistry = meterRegistry,
+ parallelLimiter = parallelLimiter(accountProperties),
+ rateLimiter = smoothOutIncoming(accountProperties),
+ circuitBreaker = PaymentCircuitBreakerFactory.forAccount(accountProperties.first())
)
}
}
diff --git a/src/main/kotlin/ru/quipy/payments/config/PaymentCircuitBreakerFactory.kt b/src/main/kotlin/ru/quipy/payments/config/PaymentCircuitBreakerFactory.kt
new file mode 100644
index 000000000..ccf02c638
--- /dev/null
+++ b/src/main/kotlin/ru/quipy/payments/config/PaymentCircuitBreakerFactory.kt
@@ -0,0 +1,28 @@
+package ru.quipy.payments.config
+
+import io.github.resilience4j.circuitbreaker.CircuitBreaker
+import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig
+import ru.quipy.payments.logic.PaymentAccountProperties
+import java.time.Duration
+
+object PaymentCircuitBreakerFactory {
+
+ fun forAccount(props: PaymentAccountProperties): CircuitBreaker {
+ val avgMs = props.averageProcessingTime.toMillis().coerceAtLeast(1L)
+ val slowThreshold = Duration.ofMillis((avgMs * 5).coerceAtMost(3000L))
+
+ val config = CircuitBreakerConfig.custom()
+ .slidingWindowType(CircuitBreakerConfig.SlidingWindowType.COUNT_BASED)
+ .slidingWindowSize(20)
+ .minimumNumberOfCalls(5)
+ .failureRateThreshold(50f)
+ .slowCallRateThreshold(80f)
+ .slowCallDurationThreshold(slowThreshold)
+ .waitDurationInOpenState(Duration.ofSeconds(3))
+ .permittedNumberOfCallsInHalfOpenState(5)
+ .automaticTransitionFromOpenToHalfOpenEnabled(true)
+ .build()
+
+ return CircuitBreaker.of("cb-${props.accountName}", config)
+ }
+}
\ No newline at end of file
diff --git a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt
index 9d9440f48..18294359d 100644
--- a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt
+++ b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt
@@ -11,6 +11,7 @@ import ru.quipy.common.utils.SlidingWindowRateLimiter
import ru.quipy.core.EventSourcingService
import ru.quipy.payments.api.PaymentAggregate
import java.util.*
+import java.util.concurrent.CompletableFuture
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
@@ -42,28 +43,31 @@ class OrderPayer(
fun processPayment(orderId: UUID, amount: Int, paymentId: UUID, deadline: Long): Long {
val createdAt = System.currentTimeMillis()
plannedCounter.increment()
- paymentExecutor.submit {
- startedCounter.increment()
- try {
- val esFuture = java.util.concurrent.CompletableFuture.runAsync {
- try {
- paymentESService.create {
- it.create(
- paymentId,
- orderId,
- amount
- )
- }
- } catch (e: Exception) {
- logger.warn("Failed to create payment ES event for $paymentId", e)
+
+ CompletableFuture
+ .runAsync(
+ {
+ startedCounter.increment()
+ paymentESService.create {
+ it.create(paymentId, orderId, amount)
}
+ },
+ paymentExecutor,
+ )
+ .thenRunAsync(
+ {
+ inExecTimer.record(now() - createdAt, TimeUnit.MILLISECONDS)
+ paymentService.submitPaymentRequest(paymentId, amount, createdAt, deadline)
+ },
+ paymentExecutor,
+ )
+ .whenComplete { _, ex ->
+ if (ex != null) {
+ logger.warn("process stopped before submit for payment: $paymentId", ex)
}
- inExecTimer.record(now()-createdAt, TimeUnit.MILLISECONDS)
- paymentService.submitPaymentRequest(paymentId, amount, createdAt, deadline)
- } finally {
completedCounter.increment()
}
- }
+
return createdAt
}
}
\ No newline at end of file
diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt
index 6590e20c7..fe4ed0cac 100644
--- a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt
+++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt
@@ -2,6 +2,7 @@ package ru.quipy.payments.logic
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.registerKotlinModule
+import io.github.resilience4j.circuitbreaker.CircuitBreaker
import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.Tag
import kotlinx.coroutines.CoroutineScope
@@ -41,7 +42,8 @@ class PaymentExternalSystemAdapterImpl(
private val token: String,
meterRegistry: MeterRegistry,
private val parallelLimiter: Semaphore,
- private val rateLimiter: SlidingWindowRateLimiter
+ private val rateLimiter: SlidingWindowRateLimiter,
+ private val circuitBreaker: CircuitBreaker,
) : PaymentExternalSystemAdapter {
private val serviceName = properties.serviceName
@@ -80,24 +82,6 @@ class PaymentExternalSystemAdapterImpl(
listOf(Tag.of("accountName", properties.accountName)),
retryRequests
) { it.toDouble() }
-
- try {
- val warmupUri = URI("http://$paymentProviderHostPort/external/accounts?serviceName=$serviceName&token=$token")
- val warmupRequest = HttpRequest.newBuilder()
- .uri(warmupUri)
- .timeout(Duration.ofSeconds(5))
- .GET()
- .build()
- val futures = (1..10).map {
- httpClient.sendAsync(warmupRequest, HttpResponse.BodyHandlers.ofString())
- }
- futures.forEach { future ->
- try { future.join() } catch (_: Exception) {}
- }
- logger.info("[$accountName] HTTP client warmup complete")
- } catch (e: Exception) {
- logger.warn("[$accountName] HTTP warmup failed (non-fatal)", e)
- }
}
@@ -131,6 +115,17 @@ class PaymentExternalSystemAdapterImpl(
throw TooManyRequestsException(deadline)
}
+ if (!circuitBreaker.tryAcquirePermission()) {
+ try {
+ paymentESService.update(paymentId) {
+ it.logProcessing(false, now(), transactionId, "Circuit breaker rejected (no permission)")
+ }
+ } catch (e: Exception) {
+ logger.error("accountName: $accountName. Failed to record CB reject for $paymentId", e)
+ }
+ return
+ }
+
parallelLimiter.acquire()
sendAttempt(0, request, paymentId, transactionId, deadline, completed, allowHedge = true)
@@ -155,7 +150,13 @@ class PaymentExternalSystemAdapterImpl(
return@schedule
}
+ if (!circuitBreaker.tryAcquirePermission()) {
+ logger.debug("accountName: $accountName. Hedge skipped: CB no permission for $paymentId")
+ return@schedule
+ }
+
parallelLimiter.acquire()
+
sendAttempt(
0,
hedgeRequest,
@@ -200,7 +201,34 @@ class PaymentExternalSystemAdapterImpl(
if (retryCount > 0) {
retryRequests.decrementAndGet()
}
- timer.record(now() - timeBeforeCall, TimeUnit.MILLISECONDS)
+
+ val callDuration = now() - timeBeforeCall
+ timer.record(callDuration, TimeUnit.MILLISECONDS)
+
+ if (throwable != null) {
+ circuitBreaker.onError(callDuration, TimeUnit.MILLISECONDS, throwable)
+ } else {
+ try {
+ val rawBody = response.body()
+ val parsed = try {
+ mapper.readValue(rawBody, ExternalSysResponse::class.java)
+ } catch (ex: Exception) {
+ ExternalSysResponse(transactionId.toString(), paymentId.toString(), false, ex.message)
+ }
+
+ if (parsed.result) {
+ circuitBreaker.onSuccess(callDuration, TimeUnit.MILLISECONDS)
+ } else {
+ circuitBreaker.onError(
+ callDuration,
+ TimeUnit.MILLISECONDS,
+ RuntimeException(parsed.message ?: "Payment rejected"),
+ )
+ }
+ } catch (e: Exception) {
+ circuitBreaker.onError(callDuration, TimeUnit.MILLISECONDS, e)
+ }
+ }
if (completed.get()) {
return@whenComplete
@@ -338,6 +366,12 @@ class PaymentExternalSystemAdapterImpl(
// }
return@schedule
}
+
+ if (!circuitBreaker.tryAcquirePermission()) {
+ logger.debug("accountName: $accountName. Retry skipped: CB no permission for $paymentId")
+ return@schedule
+ }
+
parallelLimiter.acquire()
sendAttempt(
retryCount + 1,