From eaa9b3d980b36a9daf586c8ffaf8be0cf3ece932 Mon Sep 17 00:00:00 2001 From: vaycheslav Date: Tue, 11 Nov 2025 17:53:53 +0300 Subject: [PATCH 1/4] feature: added retries for Temporary error --- Dockerfile | 2 +- .../dashboards/ServicesStatistic.json | 407 +++++++----------- .../ru/quipy/apigateway/APIController.kt | 8 +- .../apigateway/GlobalExceptionHandler.kt | 18 +- .../ru/quipy/config/RpcControlConfig.kt | 11 - .../exceptions/TooLongRequestEcexption.kt | 4 + .../TooManyRequestsRetriableException.kt | 4 + .../payments/config/PaymentAccountsConfig.kt | 11 +- .../ru/quipy/payments/logic/OrderPayer.kt | 29 +- .../logic/PaymentExternalServiceImpl.kt | 102 +++-- src/main/resources/application.properties | 9 +- 11 files changed, 290 insertions(+), 315 deletions(-) create mode 100644 src/main/kotlin/ru/quipy/exceptions/TooLongRequestEcexption.kt create mode 100644 src/main/kotlin/ru/quipy/exceptions/TooManyRequestsRetriableException.kt diff --git a/Dockerfile b/Dockerfile index cc6f2e042..3d9587cd0 100644 --- a/Dockerfile +++ b/Dockerfile @@ -6,7 +6,7 @@ RUN mvn dependency:go-offline COPY src src RUN mvn package -FROM openjdk:17-jdk-slim +FROM eclipse-temurin:17-alpine-3.22 COPY --from=build /app/target/*.jar /high-load-course.jar diff --git a/grafana/provisioning/dashboards/ServicesStatistic.json b/grafana/provisioning/dashboards/ServicesStatistic.json index 74e9740f0..eb6f028d2 100644 --- a/grafana/provisioning/dashboards/ServicesStatistic.json +++ b/grafana/provisioning/dashboards/ServicesStatistic.json @@ -1345,7 +1345,6 @@ "uid": "PBFA97CFB590B2093" }, "editorMode": "code", - "exemplar": true, "expr": "rate(stage_duration_ok_sum{service=~\"$service\", result=\"success\"}[1m]) / rate(stage_duration_ok_count{service=~\"$service\", result=\"success\"}[1m])", "interval": "", "legendFormat": "{{stage}}", @@ -1785,7 +1784,7 @@ "exemplar": true, "expr": "rate(external_sys_duration_count{service=~\"$service\"}[30s])", "hide": false, - "interval": "", + "instant": false, "legendFormat": "{{accountName}} - {{outcome}}", "range": true, "refId": "C" @@ -1923,7 +1922,7 @@ "exemplar": true, "expr": "rate(external_sys_duration_sum{service=~\"$service\"}[1m]) / rate(external_sys_duration_count{service=~\"$service\"}[1m])", "hide": false, - "interval": "", + "instant": false, "legendFormat": "{{accountName}} - {{outcome}}", "range": true, "refId": "C" @@ -2024,37 +2023,13 @@ } } ] - }, - { - "__systemRef": "hideSeriesFrom", - "matcher": { - "id": "byNames", - "options": { - "mode": "exclude", - "names": [ - "payOrder - " - ], - "prefix": "All except:", - "readOnly": true - } - }, - "properties": [ - { - "id": "custom.hideFrom", - "value": { - "legend": false, - "tooltip": false, - "viz": true - } - } - ] } ] }, "gridPos": { "h": 8, "w": 12, - "x": 12, + "x": 0, "y": 108 }, "id": 43, @@ -2065,9 +2040,7 @@ ], "displayMode": "table", "placement": "right", - "showLegend": true, - "sortBy": "Max", - "sortDesc": true + "showLegend": true }, "tooltip": { "mode": "single", @@ -2082,7 +2055,7 @@ "uid": "PBFA97CFB590B2093" }, "editorMode": "code", - "expr": "rate(http_request_latent_sum{service=~\"$service\"}[15s]) / rate(http_request_latent_count{service=~\"$service\"}[15s])", + "expr": "sum(rate(http_request_latent_sum{service=~\"$service\"}[15s]) / rate(http_request_latent_count{service=~\"$service\"}[15s]))", "hide": false, "instant": false, "legendFormat": "{{method}} - {{result}}", @@ -2390,7 +2363,6 @@ "uid": "PBFA97CFB590B2093" }, "editorMode": "code", - "exemplar": true, "expr": "sum by (name) (executor_active_threads{})", "interval": "", "legendFormat": "{{name}}", @@ -2665,210 +2637,6 @@ }, "overrides": [] }, - "gridPos": { - "h": 8, - "w": 12, - "x": 0, - "y": 38 - }, - "id": 95, - "options": { - "legend": { - "calcs": [ - "max" - ], - "displayMode": "table", - "placement": "right", - "showLegend": true - }, - "tooltip": { - "mode": "multi", - "sort": "none" - } - }, - "pluginVersion": "11.4.0", - "targets": [ - { - "datasource": { - "type": "prometheus", - "uid": "PBFA97CFB590B2093" - }, - "editorMode": "code", - "expr": "sum by (job) (system_cpu_usage{service=~\"$service\"})", - "format": "time_series", - "intervalFactor": 1, - "legendFormat": "{{job}} - System CPU Usage", - "range": true, - "refId": "A" - }, - { - "datasource": { - "type": "prometheus", - "uid": "PBFA97CFB590B2093" - }, - "editorMode": "code", - "expr": "sum by (job) (process_cpu_usage{service=~\"$service\"})", - "format": "time_series", - "intervalFactor": 1, - "legendFormat": "{{job}} -Process CPU Usage", - "range": true, - "refId": "B" - } - ], - "title": "CPU Usage", - "type": "timeseries" - }, - { - "datasource": { - "type": "prometheus", - "uid": "PBFA97CFB590B2093" - }, - "fieldConfig": { - "defaults": { - "color": { - "mode": "palette-classic" - }, - "custom": { - "axisBorderShow": false, - "axisCenteredZero": false, - "axisColorMode": "text", - "axisLabel": "", - "axisPlacement": "auto", - "barAlignment": 0, - "barWidthFactor": 0.6, - "drawStyle": "line", - "fillOpacity": 17, - "gradientMode": "opacity", - "hideFrom": { - "legend": false, - "tooltip": false, - "viz": false - }, - "insertNulls": false, - "lineInterpolation": "linear", - "lineWidth": 1, - "pointSize": 5, - "scaleDistribution": { - "type": "linear" - }, - "showPoints": "auto", - "spanNulls": false, - "stacking": { - "group": "A", - "mode": "none" - }, - "thresholdsStyle": { - "mode": "off" - } - }, - "mappings": [], - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green", - "value": null - } - ] - }, - "unit": "s" - }, - "overrides": [] - }, - "gridPos": { - "h": 8, - "w": 12, - "x": 12, - "y": 38 - }, - "id": 42, - "options": { - "legend": { - "calcs": [], - "displayMode": "table", - "placement": "right", - "showLegend": true - }, - "tooltip": { - "mode": "single", - "sort": "none" - } - }, - "pluginVersion": "11.4.0", - "targets": [ - { - "datasource": { - "type": "prometheus", - "uid": "PBFA97CFB590B2093" - }, - "editorMode": "code", - "expr": "sum(rate(jvm_gc_pause_seconds_sum{}[5m])) by (job)", - "instant": false, - "legendFormat": "{{job}}", - "range": true, - "refId": "A" - } - ], - "title": "GC delay", - "type": "timeseries" - }, - { - "datasource": { - "type": "prometheus", - "uid": "PBFA97CFB590B2093" - }, - "fieldConfig": { - "defaults": { - "color": { - "mode": "palette-classic" - }, - "custom": { - "axisBorderShow": false, - "axisCenteredZero": false, - "axisColorMode": "text", - "axisLabel": "", - "axisPlacement": "auto", - "barAlignment": 0, - "barWidthFactor": 0.6, - "drawStyle": "line", - "fillOpacity": 17, - "gradientMode": "opacity", - "hideFrom": { - "legend": false, - "tooltip": false, - "viz": false - }, - "insertNulls": false, - "lineInterpolation": "linear", - "lineWidth": 1, - "pointSize": 5, - "scaleDistribution": { - "type": "linear" - }, - "showPoints": "auto", - "spanNulls": false, - "stacking": { - "group": "A", - "mode": "none" - }, - "thresholdsStyle": { - "mode": "off" - } - }, - "mappings": [], - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green", - "value": null - } - ] - }, - "unit": "short" - }, - "overrides": [] - }, "gridPos": { "h": 8, "w": 12, @@ -2916,14 +2684,14 @@ "h": 1, "w": 24, "x": 0, - "y": 6 + "y": 7 }, "id": 97, "panels": [ { "datasource": { "type": "prometheus", - "uid": "PBFA97CFB590B2093" + "uid": "P1C574E4B1E20B3B3" }, "fieldConfig": { "defaults": { @@ -3007,7 +2775,7 @@ { "datasource": { "type": "prometheus", - "uid": "PBFA97CFB590B2093" + "uid": "P1C574E4B1E20B3B3" }, "editorMode": "code", "expr": "sum by (job) (jetty_threads_config_max{})", @@ -3025,7 +2793,7 @@ { "datasource": { "type": "prometheus", - "uid": "PBFA97CFB590B2093" + "uid": "P1C574E4B1E20B3B3" }, "fieldConfig": { "defaults": { @@ -3109,7 +2877,7 @@ { "datasource": { "type": "prometheus", - "uid": "PBFA97CFB590B2093" + "uid": "P1C574E4B1E20B3B3" }, "editorMode": "code", "expr": "sum by (job) (jetty_threads_config_min{service=~\"$service\"})", @@ -3127,7 +2895,7 @@ { "datasource": { "type": "prometheus", - "uid": "PBFA97CFB590B2093" + "uid": "P1C574E4B1E20B3B3" }, "fieldConfig": { "defaults": { @@ -3300,7 +3068,7 @@ "spanNulls": false, "stacking": { "group": "A", - "mode": "none" + "mode": "normal" }, "thresholdsStyle": { "mode": "off" @@ -3400,13 +3168,12 @@ "spanNulls": false, "stacking": { "group": "A", - "mode": "none" + "mode": "normal" }, "thresholdsStyle": { "mode": "off" } }, - "links": [], "mappings": [], "thresholds": { "mode": "absolute", @@ -3421,7 +3188,7 @@ } ] }, - "unit": "s" + "unit": "none" }, "overrides": [] }, @@ -3820,8 +3587,147 @@ ], "title": "Payment Processing - Active Tasks", "type": "row" - } - ], + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 9, + "w": 24, + "x": 0, + "y": 0 + }, + "id": 108, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.75, sum(rate(payment_external_system_request_latency_seconds_bucket[1m])) by (le, accountName))", + "legendFormat": "p75 {{accountName}}", + "range": true, + "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.80, sum(rate(payment_external_system_request_latency_seconds_bucket[1m])) by (le, accountName))", + "legendFormat": "p80 {{accountName}}", + "range": true, + "refId": "B" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.85, sum(rate(payment_external_system_request_latency_seconds_bucket[1m])) by (le, accountName))", + "legendFormat": "p85 {{accountName}}", + "range": true, + "refId": "C" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.90, sum(rate(payment_external_system_request_latency_seconds_bucket[1m])) by (le, accountName))", + "hide": false, + "legendFormat": "p90 {{accountName}}", + "range": true, + "refId": "D" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.95, sum(rate(payment_external_system_request_latency_seconds_bucket[1m])) by (le, accountName))", + "hide": false, + "legendFormat": "p95 {{accountName}}", + "range": true, + "refId": "E" + } + ], + "title": "Payment Processing Latency", + "type": "timeseries" + }], "preload": false, "refresh": "5s", "schemaVersion": 40, @@ -3868,4 +3774,5 @@ "uid": "KVr-Vmpnz", "version": 4, "weekStart": "" -} \ No newline at end of file +} + diff --git a/src/main/kotlin/ru/quipy/apigateway/APIController.kt b/src/main/kotlin/ru/quipy/apigateway/APIController.kt index 1a92d7863..fff5ac386 100644 --- a/src/main/kotlin/ru/quipy/apigateway/APIController.kt +++ b/src/main/kotlin/ru/quipy/apigateway/APIController.kt @@ -7,20 +7,18 @@ import org.springframework.web.bind.annotation.* import ru.quipy.common.utils.LeakingBucketRateLimiter import ru.quipy.common.utils.RateLimiter import ru.quipy.exceptions.TooManyRequestsException +import ru.quipy.exceptions.TooManyRequestsRetriableException import ru.quipy.orders.repository.OrderRepository import ru.quipy.payments.logic.OrderPayer import java.time.Duration import java.util.* -import java.util.concurrent.Semaphore -import java.util.concurrent.TimeUnit @RestController class APIController( private val orderRepository: OrderRepository, private val orderPayer: OrderPayer, @field:Qualifier("parallelLimiter") - private val parallelLimiter: Semaphore, - private val rateLimiter: RateLimiter = LeakingBucketRateLimiter(10, Duration.ofSeconds(1), 20) + private val rateLimiter: RateLimiter = LeakingBucketRateLimiter(8, Duration.ofSeconds(1), 38) ) { val logger: Logger = LoggerFactory.getLogger(APIController::class.java) @@ -62,7 +60,7 @@ class APIController( @PostMapping("/orders/{orderId}/payment") fun payOrder(@PathVariable orderId: UUID, @RequestParam deadline: Long): PaymentSubmissionDto { - if (!rateLimiter.tick() || !parallelLimiter.tryAcquire(1, TimeUnit.SECONDS)) { + if (!rateLimiter.tick()) { throw TooManyRequestsException(deadline) } val paymentId = UUID.randomUUID() diff --git a/src/main/kotlin/ru/quipy/apigateway/GlobalExceptionHandler.kt b/src/main/kotlin/ru/quipy/apigateway/GlobalExceptionHandler.kt index 1fede48bf..bd10eab7f 100644 --- a/src/main/kotlin/ru/quipy/apigateway/GlobalExceptionHandler.kt +++ b/src/main/kotlin/ru/quipy/apigateway/GlobalExceptionHandler.kt @@ -4,7 +4,10 @@ import org.slf4j.LoggerFactory import org.springframework.http.HttpStatus import org.springframework.http.ResponseEntity import org.springframework.web.bind.annotation.ExceptionHandler +import org.springframework.web.bind.annotation.ResponseStatus import org.springframework.web.bind.annotation.RestControllerAdvice +import ru.quipy.exceptions.TooManyRequestsRetriableException +import ru.quipy.exceptions.TooLongRequestException import ru.quipy.exceptions.TooManyRequestsException import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicLong @@ -22,9 +25,12 @@ class GlobalExceptionHandler( private val rejectedRequestsCount = AtomicInteger(0) private val lastRejectionTime = AtomicLong(0) - - @ExceptionHandler(TooManyRequestsException::class) - fun handleTooManyRequests(exception: TooManyRequestsException): ResponseEntity { + @ExceptionHandler(TooLongRequestException::class) + fun handleTooManyRequests(exception: TooLongRequestException): ResponseEntity { + return ResponseEntity.status(200).body("your request very long, i am so sorry") + } + @ExceptionHandler(TooManyRequestsRetriableException::class) + fun handleTooManyRequestsRetriable(exception: TooManyRequestsRetriableException): ResponseEntity { logger.warn("to many request") val currentTime = System.currentTimeMillis() val lastRejection = lastRejectionTime.get() @@ -44,4 +50,10 @@ class GlobalExceptionHandler( return ResponseEntity.status(200).build() } + + @ExceptionHandler(TooManyRequestsException::class) + @ResponseStatus(HttpStatus.TOO_MANY_REQUESTS) + fun handleTooManyRequests(exception: TooManyRequestsException) { + logger.warn("Handling TooManyRequestsException: ${exception.message}") + } } \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/config/RpcControlConfig.kt b/src/main/kotlin/ru/quipy/config/RpcControlConfig.kt index ec2747436..097d43cef 100644 --- a/src/main/kotlin/ru/quipy/config/RpcControlConfig.kt +++ b/src/main/kotlin/ru/quipy/config/RpcControlConfig.kt @@ -3,21 +3,10 @@ package ru.quipy.config import org.springframework.beans.factory.annotation.Qualifier import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration -import ru.quipy.common.utils.CompositeRateLimiter -import ru.quipy.common.utils.RateLimiter -import ru.quipy.common.utils.SlidingWindowRateLimiter -import ru.quipy.common.utils.TokenBucketRateLimiter import ru.quipy.payments.logic.PaymentAccountProperties -import java.time.Duration import java.util.concurrent.Semaphore -import java.util.concurrent.TimeUnit @Configuration class RpcControlConfig { - @Bean - @Qualifier("parallelLimiter") - fun parallelLimiter(accountProperties: PaymentAccountProperties): Semaphore { - return Semaphore(accountProperties.parallelRequests) - } } diff --git a/src/main/kotlin/ru/quipy/exceptions/TooLongRequestEcexption.kt b/src/main/kotlin/ru/quipy/exceptions/TooLongRequestEcexption.kt new file mode 100644 index 000000000..c50d9bc48 --- /dev/null +++ b/src/main/kotlin/ru/quipy/exceptions/TooLongRequestEcexption.kt @@ -0,0 +1,4 @@ +package ru.quipy.exceptions + +class TooLongRequestException() : RuntimeException() { +} \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/exceptions/TooManyRequestsRetriableException.kt b/src/main/kotlin/ru/quipy/exceptions/TooManyRequestsRetriableException.kt new file mode 100644 index 000000000..4c95b4b30 --- /dev/null +++ b/src/main/kotlin/ru/quipy/exceptions/TooManyRequestsRetriableException.kt @@ -0,0 +1,4 @@ +package ru.quipy.exceptions + +class TooManyRequestsRetriableException(val deadline: Long) : RuntimeException(){ +} \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/payments/config/PaymentAccountsConfig.kt b/src/main/kotlin/ru/quipy/payments/config/PaymentAccountsConfig.kt index 2842a405f..412d3c79e 100644 --- a/src/main/kotlin/ru/quipy/payments/config/PaymentAccountsConfig.kt +++ b/src/main/kotlin/ru/quipy/payments/config/PaymentAccountsConfig.kt @@ -3,6 +3,7 @@ package ru.quipy.payments.config import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule import com.fasterxml.jackson.module.kotlin.registerKotlinModule +import io.micrometer.core.instrument.MeterRegistry import org.springframework.beans.factory.annotation.Value import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration @@ -17,6 +18,7 @@ import java.net.http.HttpClient import java.net.http.HttpRequest import java.net.http.HttpResponse import java.util.* +import java.util.concurrent.Semaphore @Configuration @@ -39,7 +41,10 @@ class PaymentAccountsConfig { lateinit var allowedAccounts: List @Bean - fun accountAdapters(paymentService: EventSourcingService): List { + fun accountAdapters( + paymentService: EventSourcingService, + meterRegistry: MeterRegistry, + ): List { val request = HttpRequest.newBuilder() .uri(URI("http://${paymentProviderHostPort}/external/accounts?serviceName=$serviceName&token=$token")) .GET() @@ -60,7 +65,9 @@ class PaymentAccountsConfig { it, paymentService, paymentProviderHostPort, - token + token, + meterRegistry, + Semaphore(it.parallelRequests) ) } } diff --git a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt index fe035cd6a..b1c20c4d5 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt @@ -1,7 +1,11 @@ package ru.quipy.payments.logic import io.micrometer.core.instrument.Counter +import io.micrometer.core.instrument.MeterRegistry import io.micrometer.core.instrument.Metrics +import io.micrometer.core.instrument.Timer +import io.prometheus.metrics.core.metrics.Summary +import io.prometheus.metrics.model.snapshots.Unit import org.slf4j.Logger import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Qualifier @@ -18,14 +22,17 @@ import java.util.concurrent.Semaphore import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.TimeUnit + @Service class OrderPayer( private val paymentESService: EventSourcingService, private val paymentService: PaymentService, private val accountProperties: PaymentAccountProperties, @field:Qualifier("parallelLimiter") - private val parallelLimiter: Semaphore, -) { + private val meterRegistry: MeterRegistry, + ) { + + private val paymentProcessingPlannedCounter: Counter = Metrics.counter("payment.processing.planned", "accountName", accountProperties.accountName) private val paymentProcessingStartedCounter: Counter = @@ -43,24 +50,17 @@ class OrderPayer( accountProperties.parallelRequests, 0L, TimeUnit.MILLISECONDS, - ArrayBlockingQueue(accountProperties.parallelRequests), + ArrayBlockingQueue(8_000), NamedThreadFactory("payment-submission-executor"), - CallerBlockingRejectedExecutionHandler(maxWait = Duration.ofMillis(1500)) + CallerBlockingRejectedExecutionHandler(maxWait = Duration.ofSeconds(3)) ) } - private val rateLimit: SlidingWindowRateLimiter by lazy { - SlidingWindowRateLimiter( - rate = accountProperties.rateLimitPerSec.toLong(), - window = Duration.ofMillis(1000), - ) - } + fun processPayment(orderId: UUID, amount: Int, paymentId: UUID, deadline: Long): Long { paymentProcessingPlannedCounter.increment() - while (!rateLimit.tick()) { - Thread.sleep(Random().nextLong(1, 10)) - } + val createdAt = System.currentTimeMillis() paymentProcessingStartedCounter.increment() @@ -72,13 +72,10 @@ class OrderPayer( logger.trace("Payment ${createdEvent.paymentId} for order $orderId created.") paymentService.submitPaymentRequest(paymentId, amount, createdAt, deadline) } catch (e: Exception) { - parallelLimiter.release() paymentProcessingCompletedCounter.increment() throw e } finally { - parallelLimiter.release() paymentProcessingCompletedCounter.increment() - } } return createdAt diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt index 5da9c9984..e8594adaa 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt @@ -2,15 +2,21 @@ package ru.quipy.payments.logic import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.registerKotlinModule +import io.micrometer.core.instrument.MeterRegistry +import java.util.concurrent.Semaphore import okhttp3.OkHttpClient import okhttp3.Request import okhttp3.RequestBody import org.slf4j.LoggerFactory +import ru.quipy.common.utils.SlidingWindowRateLimiter import ru.quipy.core.EventSourcingService +import ru.quipy.exceptions.TooManyRequestsException import ru.quipy.payments.api.PaymentAggregate +import java.io.InterruptedIOException import java.net.SocketTimeoutException import java.time.Duration import java.util.* +import java.util.concurrent.TimeUnit import kotlin.math.pow @@ -20,6 +26,8 @@ class PaymentExternalSystemAdapterImpl( private val paymentESService: EventSourcingService, private val paymentProviderHostPort: String, private val token: String, + private val meterRegistry: MeterRegistry, + private val parallelLimiter: Semaphore ) : PaymentExternalSystemAdapter { companion object { @@ -29,13 +37,21 @@ class PaymentExternalSystemAdapterImpl( val mapper = ObjectMapper().registerKotlinModule() } + private val timer = meterRegistry.timer("payment.external.system.request.latency", "accountName", properties.accountName) private val serviceName = properties.serviceName private val accountName = properties.accountName private val requestAverageProcessingTime = properties.averageProcessingTime private val rateLimitPerSec = properties.rateLimitPerSec private val parallelRequests = properties.parallelRequests + private val rateLimit: SlidingWindowRateLimiter by lazy { + SlidingWindowRateLimiter( + rate = rateLimitPerSec.toLong(), + window = Duration.ofMillis(1000), + ) + } - private val client = OkHttpClient.Builder().build() + private val client = OkHttpClient.Builder() + .callTimeout(1100, TimeUnit.MILLISECONDS).build() override fun getAccountProperties(): PaymentAccountProperties { return properties @@ -54,6 +70,16 @@ class PaymentExternalSystemAdapterImpl( logger.info("[$accountName] Submit: $paymentId , txId: $transactionId") + if (!rateLimit.tick()) { + throw TooManyRequestsException(deadline) + } + val exResult = parallelLimiter.tryAcquire(timeToDead(deadline), TimeUnit.MILLISECONDS) + if (!exResult) { + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "acquire timeout") + } + } + val timeBeforeCall = now() try { val request = Request.Builder().run { url("http://$paymentProviderHostPort/external/process?serviceName=$serviceName&token=$token&accountName=$accountName&transactionId=$transactionId&paymentId=$paymentId&amount=$amount") @@ -62,36 +88,59 @@ class PaymentExternalSystemAdapterImpl( var isCompletedRequest = false var retryCount = 0 + + if (deadline-now() < requestAverageProcessingTime.toMillis()) { + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "deadline was expired") + } + return + } while (!isCompletedRequest && now() < deadline) { - client.newCall(request).execute().use { response -> - val body = try { - mapper.readValue(response.body?.string(), ExternalSysResponse::class.java) - } catch (e: Exception) { - logger.error("[$accountName] [ERROR] Payment processed for txId: $transactionId, payment: $paymentId, result code: ${response.code}, reason: ${response.body?.string()}") - ExternalSysResponse(transactionId.toString(), paymentId.toString(), false, e.message) + if (deadline-now() < requestAverageProcessingTime.toMillis()) { + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "deadline was expired") + } + return } - - if (body.message?.contains("Temporary error") == true) { - if (retryCount < 7) { - retryCount++ - val backoffTime = (2.0.pow(retryCount.toDouble()) * 10 + Random().nextLong(0, 10)).toLong() - Thread.sleep(backoffTime) - continue + client.newCall(request).execute().use { response -> + val body = try { + mapper.readValue(response.body?.string(), ExternalSysResponse::class.java) + } catch (e: Exception) { + logger.error("[$accountName] [ERROR] Payment processed for txId: $transactionId, payment: $paymentId, result code: ${response.code}, reason: ${response.body?.string()}") + ExternalSysResponse( + transactionId.toString(), + paymentId.toString(), + false, + e.message + ) + } + isCompletedRequest = if (!body.result && !(response.code == 429 || response.code in 500..504)) { + if (retryCount < 7) { + retryCount++ + val backoffTime = (2.0.pow(retryCount.toDouble()) * 10 + Random().nextLong(0, 10)).toLong() + Thread.sleep(backoffTime) + continue + } else { + true + } } else { - isCompletedRequest = true + true } - } else { - isCompletedRequest = true - } - logger.warn("[$accountName] Payment processed for txId: $transactionId, payment: $paymentId, succeeded: ${body.result}, message: ${body.message}") + logger.warn("[$accountName] Payment processed for txId: $transactionId, payment: $paymentId, succeeded: ${body.result}, message: ${body.message}") - // Здесь мы обновляем состояние оплаты в зависимости от результата в базе данных оплат. - // Это требуется сделать ВО ВСЕХ ИСХОДАХ (успешная оплата / неуспешная / ошибочная ситуация) - paymentESService.update(paymentId) { - it.logProcessing(body.result, now(), transactionId, reason = body.message) + // Здесь мы обновляем состояние оплаты в зависимости от результата в базе данных оплат. + // Это требуется сделать ВО ВСЕХ ИСХОДАХ (успешная оплата / неуспешная / ошибочная ситуация) + paymentESService.update(paymentId) { + it.logProcessing(body.result, now(), transactionId, reason = body.message) + } } } + + if (deadline < now()) { + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "deadline was expired") + } } } catch (e: Exception) { when (e) { @@ -109,6 +158,9 @@ class PaymentExternalSystemAdapterImpl( } } } + } finally { + timer.record(now()-timeBeforeCall, TimeUnit.MILLISECONDS) + parallelLimiter.release() } } @@ -119,5 +171,7 @@ class PaymentExternalSystemAdapterImpl( override fun name() = properties.accountName } - +public fun timeToDead(deadline: Long): Long { + return deadline - now() +} public fun now() = System.currentTimeMillis() \ No newline at end of file diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 73440a6fd..3170b6dad 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -15,14 +15,17 @@ event.sourcing.sagas-enabled=false spring.datasource.hikari.jdbc-url=jdbc:postgresql://${POSTGRES_ADDRESS:localhost}:${POSTGRES_PORT:65432}/postgres spring.datasource.hikari.username=tiny_es spring.datasource.hikari.password=tiny_es + spring.datasource.hikari.leak-detection-threshold=2000 management.metrics.web.server.request.autotime.percentiles=0.95 management.metrics.export.prometheus.enabled=true -management.endpoints.web.exposure.include=info,health,prometheus,metrics +management.endpoints.web.exposure.include=prometheus,health,info +management.metrics.distribution.percentiles-histogram.http.server.requests=true +management.metrics.distribution.percentiles-histogram.payment.external.system.request.latency=true payment.service-name=${PAYMENT_SERVICE_NAME} payment.token=${PAYMENT_TOKEN} # payment.accounts=${PAYMENT_ACCOUNTS:acc-12,acc-20} # payment.accounts=${PAYMENT_ACCOUNTS:acc-3} -payment.accounts=${PAYMENT_ACCOUNTS:acc-8} +payment.accounts=${PAYMENT_ACCOUNTS:acc-7} # payment.accounts=${PAYMENT_ACCOUNTS:acc-18} -payment.hostPort=${PAYMENT_HOST:localhost}:${PAYMENT_PORT:1234} \ No newline at end of file +payment.hostPort=${PAYMENT_HOST:localhost}:${PAYMENT_PORT:1234} From fd3e8c2bc0f6cfe6ac9db82651405e92a7568949 Mon Sep 17 00:00:00 2001 From: vaycheslav Date: Tue, 11 Nov 2025 18:47:47 +0300 Subject: [PATCH 2/4] fix:added timeout for window --- .../ru/quipy/common/utils/SlidingWindowRateLimiter.kt | 11 +++++++++++ .../payments/logic/PaymentExternalServiceImpl.kt | 6 +++--- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/src/main/kotlin/ru/quipy/common/utils/SlidingWindowRateLimiter.kt b/src/main/kotlin/ru/quipy/common/utils/SlidingWindowRateLimiter.kt index 1d86848e2..e88f239bd 100644 --- a/src/main/kotlin/ru/quipy/common/utils/SlidingWindowRateLimiter.kt +++ b/src/main/kotlin/ru/quipy/common/utils/SlidingWindowRateLimiter.kt @@ -1,5 +1,7 @@ package ru.quipy.common.utils +import com.github.dockerjava.zerodep.shaded.org.apache.hc.core5.util.Deadline +import com.github.dockerjava.zerodep.shaded.org.apache.hc.core5.util.Timeout import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.asCoroutineDispatcher import kotlinx.coroutines.delay @@ -37,6 +39,15 @@ class SlidingWindowRateLimiter( } } + fun tickBlocking(timeout: Long): Boolean { + val timeStarted = System.currentTimeMillis() + while (System.currentTimeMillis()-timeStarted < timeout && !tick()) { + Thread.sleep(10) + } + return System.currentTimeMillis()-timeStarted < timeout + } + + data class Measure( val value: Long, val timestamp: Long diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt index e8594adaa..0a72a8b8a 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt @@ -51,7 +51,7 @@ class PaymentExternalSystemAdapterImpl( } private val client = OkHttpClient.Builder() - .callTimeout(1100, TimeUnit.MILLISECONDS).build() + .callTimeout(1200, TimeUnit.MILLISECONDS).build() override fun getAccountProperties(): PaymentAccountProperties { return properties @@ -70,7 +70,7 @@ class PaymentExternalSystemAdapterImpl( logger.info("[$accountName] Submit: $paymentId , txId: $transactionId") - if (!rateLimit.tick()) { + if (!rateLimit.tickBlocking(timeToDead(deadline))) { throw TooManyRequestsException(deadline) } val exResult = parallelLimiter.tryAcquire(timeToDead(deadline), TimeUnit.MILLISECONDS) @@ -114,7 +114,7 @@ class PaymentExternalSystemAdapterImpl( e.message ) } - isCompletedRequest = if (!body.result && !(response.code == 429 || response.code in 500..504)) { + isCompletedRequest = if (!body.result && !(response.code >= 500 || response.code == 429)) { if (retryCount < 7) { retryCount++ val backoffTime = (2.0.pow(retryCount.toDouble()) * 10 + Random().nextLong(0, 10)).toLong() From 91242090366937162862131b534013d5c8a30150 Mon Sep 17 00:00:00 2001 From: vaycheslav Date: Tue, 11 Nov 2025 19:16:50 +0300 Subject: [PATCH 3/4] fix:added timeout for window --- .../logic/PaymentExternalServiceImpl.kt | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt index 0a72a8b8a..6f83eefd9 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt @@ -7,6 +7,7 @@ import java.util.concurrent.Semaphore import okhttp3.OkHttpClient import okhttp3.Request import okhttp3.RequestBody +import okio.IOException import org.slf4j.LoggerFactory import ru.quipy.common.utils.SlidingWindowRateLimiter import ru.quipy.core.EventSourcingService @@ -51,7 +52,9 @@ class PaymentExternalSystemAdapterImpl( } private val client = OkHttpClient.Builder() - .callTimeout(1200, TimeUnit.MILLISECONDS).build() + .writeTimeout(Duration.ofMillis(1200)) + .readTimeout(Duration.ofMillis(1200)) + .callTimeout(1500, TimeUnit.MILLISECONDS).build() override fun getAccountProperties(): PaymentAccountProperties { return properties @@ -150,6 +153,18 @@ class PaymentExternalSystemAdapterImpl( it.logProcessing(false, now(), transactionId, reason = "Request timeout.") } } + is InterruptedIOException -> { + logger.error("[$accountName] Server timeout for txId: $transactionId, payment: $paymentId", e) + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, reason = "Server timeout") + } + } + is IOException -> { + logger.error("[$accountName] Server timeout for txId: $transactionId, payment: $paymentId", e) + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, reason = "Server timeout") + } + } else -> { logger.error("[$accountName] Payment failed for txId: $transactionId, payment: $paymentId", e) From 075a5180c337a43345280af623e0c9f999edd0b5 Mon Sep 17 00:00:00 2001 From: vaycheslav Date: Thu, 13 Nov 2025 19:31:48 +0300 Subject: [PATCH 4/4] fix: try to fix client timeout --- .../common/utils/SlidingWindowRateLimiter.kt | 2 +- .../logic/PaymentExternalServiceImpl.kt | 21 +++++++++---------- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/src/main/kotlin/ru/quipy/common/utils/SlidingWindowRateLimiter.kt b/src/main/kotlin/ru/quipy/common/utils/SlidingWindowRateLimiter.kt index e88f239bd..c5bd36a0a 100644 --- a/src/main/kotlin/ru/quipy/common/utils/SlidingWindowRateLimiter.kt +++ b/src/main/kotlin/ru/quipy/common/utils/SlidingWindowRateLimiter.kt @@ -42,7 +42,7 @@ class SlidingWindowRateLimiter( fun tickBlocking(timeout: Long): Boolean { val timeStarted = System.currentTimeMillis() while (System.currentTimeMillis()-timeStarted < timeout && !tick()) { - Thread.sleep(10) + Thread.sleep(2) } return System.currentTimeMillis()-timeStarted < timeout } diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt index 6f83eefd9..b8ee7bd2c 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt @@ -52,9 +52,9 @@ class PaymentExternalSystemAdapterImpl( } private val client = OkHttpClient.Builder() - .writeTimeout(Duration.ofMillis(1200)) - .readTimeout(Duration.ofMillis(1200)) - .callTimeout(1500, TimeUnit.MILLISECONDS).build() + .writeTimeout(Duration.ofMillis(1000)) + .readTimeout(Duration.ofMillis(1000)) + .callTimeout(1150, TimeUnit.MILLISECONDS).build() override fun getAccountProperties(): PaymentAccountProperties { return properties @@ -92,14 +92,14 @@ class PaymentExternalSystemAdapterImpl( var isCompletedRequest = false var retryCount = 0 - if (deadline-now() < requestAverageProcessingTime.toMillis()) { + if (timeToDead(deadline) < 0) { paymentESService.update(paymentId) { it.logProcessing(false, now(), transactionId, "deadline was expired") } return } while (!isCompletedRequest && now() < deadline) { - if (deadline-now() < requestAverageProcessingTime.toMillis()) { + if (timeToDead(deadline) < 0) { paymentESService.update(paymentId) { it.logProcessing(false, now(), transactionId, "deadline was expired") } @@ -118,7 +118,7 @@ class PaymentExternalSystemAdapterImpl( ) } isCompletedRequest = if (!body.result && !(response.code >= 500 || response.code == 429)) { - if (retryCount < 7) { + if (retryCount < 3) { retryCount++ val backoffTime = (2.0.pow(retryCount.toDouble()) * 10 + Random().nextLong(0, 10)).toLong() Thread.sleep(backoffTime) @@ -140,7 +140,7 @@ class PaymentExternalSystemAdapterImpl( } } - if (deadline < now()) { + if (timeToDead(deadline) < 0) { paymentESService.update(paymentId) { it.logProcessing(false, now(), transactionId, "deadline was expired") } @@ -184,9 +184,8 @@ class PaymentExternalSystemAdapterImpl( override fun isEnabled() = properties.enabled override fun name() = properties.accountName - -} -public fun timeToDead(deadline: Long): Long { - return deadline - now() + fun timeToDead(deadline: Long): Long { + return deadline - now() - requestAverageProcessingTime.toMillis() + } } public fun now() = System.currentTimeMillis() \ No newline at end of file