Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,7 @@ build/
!**/src/test/**/build/

### VS Code ###
.vscode/
.vscode/

grafana/
prometheus/
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@
<artifactId>resilience4j-ratelimiter</artifactId>
<version>${resilience4jVersion}</version>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-circuitbreaker</artifactId>
<version>${resilience4jVersion}</version>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlinx</groupId>
<artifactId>kotlinx-coroutines-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,13 +152,14 @@ class PaymentAccountsConfig {
.onEach(::println)
.map {
PaymentExternalSystemAdapterImpl(
it,
paymentService,
paymentProviderHostPort,
token,
meterRegistry,
parallelLimiter(accountProperties),
rateLimiter = smoothOutIncoming(accountProperties)
properties = it,
paymentESService = paymentService,
paymentProviderHostPort = paymentProviderHostPort,
token = token,
meterRegistry = meterRegistry,
parallelLimiter = parallelLimiter(accountProperties),
rateLimiter = smoothOutIncoming(accountProperties),
circuitBreaker = PaymentCircuitBreakerFactory.forAccount(accountProperties.first())
)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package ru.quipy.payments.config

import io.github.resilience4j.circuitbreaker.CircuitBreaker
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig
import ru.quipy.payments.logic.PaymentAccountProperties
import java.time.Duration

object PaymentCircuitBreakerFactory {

fun forAccount(props: PaymentAccountProperties): CircuitBreaker {
val avgMs = props.averageProcessingTime.toMillis().coerceAtLeast(1L)
val slowThreshold = Duration.ofMillis((avgMs * 5).coerceAtMost(3000L))

val config = CircuitBreakerConfig.custom()
.slidingWindowType(CircuitBreakerConfig.SlidingWindowType.COUNT_BASED)
.slidingWindowSize(20)
.minimumNumberOfCalls(5)
.failureRateThreshold(50f)
.slowCallRateThreshold(80f)
.slowCallDurationThreshold(slowThreshold)
.waitDurationInOpenState(Duration.ofSeconds(3))
.permittedNumberOfCallsInHalfOpenState(5)
.automaticTransitionFromOpenToHalfOpenEnabled(true)
.build()

return CircuitBreaker.of("cb-${props.accountName}", config)
}
}
40 changes: 22 additions & 18 deletions src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import ru.quipy.common.utils.SlidingWindowRateLimiter
import ru.quipy.core.EventSourcingService
import ru.quipy.payments.api.PaymentAggregate
import java.util.*
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit

Expand Down Expand Up @@ -42,28 +43,31 @@ class OrderPayer(
fun processPayment(orderId: UUID, amount: Int, paymentId: UUID, deadline: Long): Long {
val createdAt = System.currentTimeMillis()
plannedCounter.increment()
paymentExecutor.submit {
startedCounter.increment()
try {
val esFuture = java.util.concurrent.CompletableFuture.runAsync {
try {
paymentESService.create {
it.create(
paymentId,
orderId,
amount
)
}
} catch (e: Exception) {
logger.warn("Failed to create payment ES event for $paymentId", e)

CompletableFuture
.runAsync(
{
startedCounter.increment()
paymentESService.create {
it.create(paymentId, orderId, amount)
}
},
paymentExecutor,
)
.thenRunAsync(
{
inExecTimer.record(now() - createdAt, TimeUnit.MILLISECONDS)
paymentService.submitPaymentRequest(paymentId, amount, createdAt, deadline)
},
paymentExecutor,
)
.whenComplete { _, ex ->
if (ex != null) {
logger.warn("process stopped before submit for payment: $paymentId", ex)
}
inExecTimer.record(now()-createdAt, TimeUnit.MILLISECONDS)
paymentService.submitPaymentRequest(paymentId, amount, createdAt, deadline)
} finally {
completedCounter.increment()
}
}

return createdAt
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ru.quipy.payments.logic

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.registerKotlinModule
import io.github.resilience4j.circuitbreaker.CircuitBreaker
import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.Tag
import kotlinx.coroutines.CoroutineScope
Expand Down Expand Up @@ -41,7 +42,8 @@ class PaymentExternalSystemAdapterImpl(
private val token: String,
meterRegistry: MeterRegistry,
private val parallelLimiter: Semaphore,
private val rateLimiter: SlidingWindowRateLimiter
private val rateLimiter: SlidingWindowRateLimiter,
private val circuitBreaker: CircuitBreaker,
) : PaymentExternalSystemAdapter {

private val serviceName = properties.serviceName
Expand Down Expand Up @@ -80,24 +82,6 @@ class PaymentExternalSystemAdapterImpl(
listOf(Tag.of("accountName", properties.accountName)),
retryRequests
) { it.toDouble() }

try {
val warmupUri = URI("http://$paymentProviderHostPort/external/accounts?serviceName=$serviceName&token=$token")
val warmupRequest = HttpRequest.newBuilder()
.uri(warmupUri)
.timeout(Duration.ofSeconds(5))
.GET()
.build()
val futures = (1..10).map {
httpClient.sendAsync(warmupRequest, HttpResponse.BodyHandlers.ofString())
}
futures.forEach { future ->
try { future.join() } catch (_: Exception) {}
}
logger.info("[$accountName] HTTP client warmup complete")
} catch (e: Exception) {
logger.warn("[$accountName] HTTP warmup failed (non-fatal)", e)
}
}


Expand Down Expand Up @@ -131,6 +115,17 @@ class PaymentExternalSystemAdapterImpl(
throw TooManyRequestsException(deadline)
}

if (!circuitBreaker.tryAcquirePermission()) {
try {
paymentESService.update(paymentId) {
it.logProcessing(false, now(), transactionId, "Circuit breaker rejected (no permission)")
}
} catch (e: Exception) {
logger.error("accountName: $accountName. Failed to record CB reject for $paymentId", e)
}
return
}

parallelLimiter.acquire()
sendAttempt(0, request, paymentId, transactionId, deadline, completed, allowHedge = true)

Expand All @@ -155,7 +150,13 @@ class PaymentExternalSystemAdapterImpl(
return@schedule
}

if (!circuitBreaker.tryAcquirePermission()) {
logger.debug("accountName: $accountName. Hedge skipped: CB no permission for $paymentId")
return@schedule
}

parallelLimiter.acquire()

sendAttempt(
0,
hedgeRequest,
Expand Down Expand Up @@ -200,7 +201,34 @@ class PaymentExternalSystemAdapterImpl(
if (retryCount > 0) {
retryRequests.decrementAndGet()
}
timer.record(now() - timeBeforeCall, TimeUnit.MILLISECONDS)

val callDuration = now() - timeBeforeCall
timer.record(callDuration, TimeUnit.MILLISECONDS)

if (throwable != null) {
circuitBreaker.onError(callDuration, TimeUnit.MILLISECONDS, throwable)
} else {
try {
val rawBody = response.body()
val parsed = try {
mapper.readValue(rawBody, ExternalSysResponse::class.java)
} catch (ex: Exception) {
ExternalSysResponse(transactionId.toString(), paymentId.toString(), false, ex.message)
}

if (parsed.result) {
circuitBreaker.onSuccess(callDuration, TimeUnit.MILLISECONDS)
} else {
circuitBreaker.onError(
callDuration,
TimeUnit.MILLISECONDS,
RuntimeException(parsed.message ?: "Payment rejected"),
)
}
} catch (e: Exception) {
circuitBreaker.onError(callDuration, TimeUnit.MILLISECONDS, e)
}
}

if (completed.get()) {
return@whenComplete
Expand Down Expand Up @@ -338,6 +366,12 @@ class PaymentExternalSystemAdapterImpl(
// }
return@schedule
}

if (!circuitBreaker.tryAcquirePermission()) {
logger.debug("accountName: $accountName. Retry skipped: CB no permission for $paymentId")
return@schedule
}

parallelLimiter.acquire()
sendAttempt(
retryCount + 1,
Expand Down