diff --git a/.gitignore b/.gitignore index 259113f73..db5578ab1 100644 --- a/.gitignore +++ b/.gitignore @@ -1,35 +1,35 @@ -HELP.md -target/ -!.mvn/wrapper/maven-wrapper.jar -!**/src/main/**/target/ -!**/src/test/**/target/ - -### STS ### -.apt_generated -.classpath -.factorypath -.project -.settings -.springBeans -.sts4-cache - -### IntelliJ IDEA ### -.idea -*.iws -*.iml -*.ipr -./http-client.env.json - -### NetBeans ### -/nbproject/private/ - -/nbbuild/ -/dist/ -/nbdist/ -/.nb-gradle/ -build/ -!**/src/main/**/build/ -!**/src/test/**/build/ - -### VS Code ### -.vscode/ +HELP.md +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### STS ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr +./http-client.env.json + +### NetBeans ### +/nbproject/private/ + +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ \ No newline at end of file diff --git a/README.md b/README.md index 56d36970f..bc19c4280 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,9 @@ # Template for the HighLoad course + This project is based on [Tiny Event Sourcing library](https://github.com/andrsuh/tiny-event-sourcing) ### Run PostgreSql + This example uses Postgres as an implementation of the Event store. You can see it in `pom.xml`: ``` @@ -12,7 +14,8 @@ This example uses Postgres as an implementation of the Event store. You can see ``` -Thus, you have to run Postgres in order to test this example. Postgres service is included in `docker-compose` file that we have in the root of the project. +Thus, you have to run Postgres in order to test this example. Postgres service is included in `docker-compose` file +that we have in the root of the project. # More comprehensive information about the course, project, how to run tests is here: @@ -21,19 +24,25 @@ https://andrsuh.notion.site/2595d535059281d8a815c2cb3875c376?source=copy_link https://andrsuh.notion.site/2625d5350592801aaf88c7c95302d10c?source=copy_link ### Run the infrastructure + Set of the services you need to start developing and testing process is following: -- Bombardier - service that is in charge of emulation the store's clients activity (creates the incoming load). Also serves as a third-party payment system. + +- Bombardier - service that is in charge of emulation the store's clients activity (creates the incoming load). Also + serves as a third-party payment system. - Postgres DBMS - Prometheus + Grafana - metrics collection and visualization services You can run all beforementioned services by the following command: + ``` docker compose -f docker-compose.yml up ``` ### Run the application -To make the application run you can start the main class `OnlineShopApplication`. It is not being launched as a docker contained to simplify and speed up the devevopment process as it is easier for you to refactor the application and re-run it immediately in the IDE. +To make the application run you can start the main class `OnlineShopApplication`. It is not being launched as a docker +contained to simplify and speed up the devevopment process as it is easier for you to refactor the application and +re-run it immediately in the IDE. ### If you want to pull changes from the main repository into your fork diff --git a/grafana/provisioning/dashboards/ServicesStatistic.json b/grafana/provisioning/dashboards/ServicesStatistic.json index 684b97269..74e9740f0 100644 --- a/grafana/provisioning/dashboards/ServicesStatistic.json +++ b/grafana/provisioning/dashboards/ServicesStatistic.json @@ -3468,6 +3468,358 @@ ], "title": "Jetty Statistics", "type": "row" + }, + { + "collapsed": true, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 7 + }, + "id": 101, + "panels": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "description": "История изменения количества задач в очереди", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "Задачи в очереди", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 30, + "gradientMode": "opacity", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "smooth", + "lineWidth": 2, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 16, + "x": 8, + "y": 8 + }, + "id": 103, + "options": { + "legend": { + "calcs": [ + "mean", + "lastNotNull", + "max" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "none" + } + }, + "pluginVersion": "9.0.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "expr": "payment_processing_planned_total - payment_processing_started_total", + "legendFormat": "{{accountName}}", + "refId": "A" + } + ], + "title": "История задач в очереди", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "description": "Скорость входа в ожидание и выхода из ожидания (requests/sec)", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "Запросов/сек", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "smooth", + "lineWidth": 2, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + }, + "unit": "reqps" + }, + "overrides": [ + { + "matcher": { + "id": "byName", + "options": "Скорость увеличения очереди" + }, + "properties": [ + { + "id": "color", + "value": { + "fixedColor": "green", + "mode": "fixed" + } + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "Скорость числа задач в обработке" + }, + "properties": [ + { + "id": "color", + "value": { + "fixedColor": "blue", + "mode": "fixed" + } + } + ] + } + ] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 16 + }, + "id": 104, + "options": { + "legend": { + "calcs": [ + "mean", + "lastNotNull", + "max" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "none" + } + }, + "pluginVersion": "9.0.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "expr": "rate(payment_processing_planned_total[1m])", + "legendFormat": "Попало в очередь: {{accountName}}", + "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "expr": "rate(payment_processing_started_total[1m])", + "legendFormat": "Приняты в обработку (вышли из очереди): {{accountName}}", + "refId": "B" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "expr": "rate(payment_processing_completed_total[1m])", + "legendFormat": "Обработаны: {{accountName}}", + "refId": "C" + } + ], + "title": "Throughput (в очереди vs в обработке vs обработаны)", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "description": "Если значение > 0, задачи накапливаются быстрее, чем обрабатываются", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": true, + "axisColorMode": "text", + "axisLabel": "Δ запросов/сек", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 30, + "gradientMode": "scheme", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "smooth", + "lineWidth": 2, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "area" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 0.1 + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 16 + }, + "id": 105, + "options": { + "legend": { + "calcs": [ + "mean", + "lastNotNull" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "none" + } + }, + "pluginVersion": "9.0.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "expr": "rate(payment_processing_planned_total[1m]) - rate(payment_processing_started_total[1m])", + "legendFormat": "{{accountName}}", + "refId": "A" + } + ], + "title": "Скорость накопления задач (увеличения очереди)", + "type": "timeseries" + } + ], + "title": "Payment Processing - Active Tasks", + "type": "row" } ], "preload": false, diff --git a/pom.xml b/pom.xml index 5724b2568..e244cb6aa 100644 --- a/pom.xml +++ b/pom.xml @@ -1,5 +1,5 @@ - 4.0.0 @@ -15,7 +15,7 @@ OnlineShop Application for resilience and highly-loaded applications course - + 2.2.0 1.9.0 4.12.0 @@ -25,7 +25,7 @@ 3.1.8 - + io.github.resilience4j resilience4j-ratelimiter @@ -67,15 +67,15 @@ ${jetty.version} - - - - + + + + - - - - + + + + com.fasterxml.jackson.module diff --git a/prometheus/prometheus.yml b/prometheus/prometheus.yml index 936c7edd9..ce045546f 100644 --- a/prometheus/prometheus.yml +++ b/prometheus/prometheus.yml @@ -6,16 +6,16 @@ scrape_configs: - job_name: 'bombardier-docker-network-job' metrics_path: '/actuator/prometheus' static_configs: - - targets: ['bombardier:1234'] + - targets: [ 'bombardier:1234' ] - job_name: 'bombardier-host-job' metrics_path: '/actuator/prometheus' static_configs: - - targets: ['host.docker.internal:1234'] + - targets: [ 'host.docker.internal:1234' ] - job_name: 'online-store-job' metrics_path: '/actuator/prometheus' static_configs: - - targets: ['host.docker.internal:8081'] + - targets: [ 'host.docker.internal:8081' ] - job_name: 'online-shop-job' metrics_path: '/actuator/prometheus' static_configs: - - targets: ['host.docker.internal:18081'] \ No newline at end of file + - targets: [ 'host.docker.internal:18081' ] \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/apigateway/APIController.kt b/src/main/kotlin/ru/quipy/apigateway/APIController.kt index 6f23fa18d..251b8cdbc 100644 --- a/src/main/kotlin/ru/quipy/apigateway/APIController.kt +++ b/src/main/kotlin/ru/quipy/apigateway/APIController.kt @@ -2,23 +2,16 @@ package ru.quipy.apigateway import org.slf4j.Logger import org.slf4j.LoggerFactory -import org.springframework.beans.factory.annotation.Autowired import org.springframework.web.bind.annotation.* import ru.quipy.orders.repository.OrderRepository import ru.quipy.payments.logic.OrderPayer import java.util.* @RestController -class APIController { +class APIController(private val orderRepository: OrderRepository, private val orderPayer: OrderPayer) { val logger: Logger = LoggerFactory.getLogger(APIController::class.java) - @Autowired - private lateinit var orderRepository: OrderRepository - - @Autowired - private lateinit var orderPayer: OrderPayer - @PostMapping("/users") fun createUser(@RequestBody req: CreateUserRequest): User { return User(UUID.randomUUID(), req.name) @@ -62,7 +55,6 @@ class APIController { it } ?: throw IllegalArgumentException("No such order $orderId") - val createdAt = orderPayer.processPayment(orderId, order.price, paymentId, deadline) return PaymentSubmissionDto(createdAt, paymentId) } diff --git a/src/main/kotlin/ru/quipy/common/utils/FixedWindowRateLimiter.kt b/src/main/kotlin/ru/quipy/common/utils/FixedWindowRateLimiter.kt index 15920bd9c..768c71367 100644 --- a/src/main/kotlin/ru/quipy/common/utils/FixedWindowRateLimiter.kt +++ b/src/main/kotlin/ru/quipy/common/utils/FixedWindowRateLimiter.kt @@ -19,7 +19,7 @@ class FixedWindowRateLimiter( private val rate: Int, private val window: Long, private val timeUnit: TimeUnit = TimeUnit.MINUTES, -): RateLimiter { +) : RateLimiter { companion object { private val logger: Logger = LoggerFactory.getLogger(FixedWindowRateLimiter::class.java) private val counter = AtomicInteger(0) @@ -59,7 +59,7 @@ class SlowStartRateLimiter( private val targetRate: Int, private val timeUnit: TimeUnit = TimeUnit.MINUTES, private val slowStartOn: Boolean = true, -): RateLimiter { +) : RateLimiter { companion object { private val logger: Logger = LoggerFactory.getLogger(SlowStartRateLimiter::class.java) private val counter = AtomicInteger(0) @@ -108,7 +108,7 @@ class CountingRateLimiter( private val rate: Int, private val window: Long, private val timeUnit: TimeUnit = TimeUnit.SECONDS -): RateLimiter { +) : RateLimiter { companion object { private val logger: Logger = LoggerFactory.getLogger(CountingRateLimiter::class.java) } @@ -137,7 +137,11 @@ class CountingRateLimiter( ) } -fun makeRateLimiter(accountName: String, rate: Int, timeUnit: TimeUnit = TimeUnit.SECONDS): io.github.resilience4j.ratelimiter.RateLimiter { +fun makeRateLimiter( + accountName: String, + rate: Int, + timeUnit: TimeUnit = TimeUnit.SECONDS +): io.github.resilience4j.ratelimiter.RateLimiter { val config = RateLimiterConfig.custom() .limitRefreshPeriod(if (timeUnit == TimeUnit.SECONDS) Duration.ofSeconds(1) else Duration.ofMinutes(1)) .limitForPeriod(rate) diff --git a/src/main/kotlin/ru/quipy/common/utils/SlidingWindowRateLimiter.kt b/src/main/kotlin/ru/quipy/common/utils/SlidingWindowRateLimiter.kt index 6ff3092ab..1d86848e2 100644 --- a/src/main/kotlin/ru/quipy/common/utils/SlidingWindowRateLimiter.kt +++ b/src/main/kotlin/ru/quipy/common/utils/SlidingWindowRateLimiter.kt @@ -10,8 +10,6 @@ import java.time.Duration import java.util.concurrent.Executors import java.util.concurrent.PriorityBlockingQueue import java.util.concurrent.atomic.AtomicLong -import java.util.concurrent.locks.ReentrantLock -import kotlin.concurrent.withLock class SlidingWindowRateLimiter( private val rate: Long, @@ -64,6 +62,7 @@ class SlidingWindowRateLimiter( queue.take() } }.invokeOnCompletion { th -> if (th != null) logger.error("Rate limiter release job completed", th) } + companion object { private val logger: Logger = LoggerFactory.getLogger(SlidingWindowRateLimiter::class.java) } diff --git a/src/main/kotlin/ru/quipy/common/utils/TokenBucketRateLimiter.kt b/src/main/kotlin/ru/quipy/common/utils/TokenBucketRateLimiter.kt index a7c9547d7..a0791607f 100644 --- a/src/main/kotlin/ru/quipy/common/utils/TokenBucketRateLimiter.kt +++ b/src/main/kotlin/ru/quipy/common/utils/TokenBucketRateLimiter.kt @@ -15,7 +15,7 @@ class TokenBucketRateLimiter( private val bucketMaxCapacity: Int, private val window: Long, private val timeUnit: TimeUnit = TimeUnit.MINUTES, -): RateLimiter { +) : RateLimiter { companion object { private val logger: Logger = LoggerFactory.getLogger(TokenBucketRateLimiter::class.java) } diff --git a/src/main/kotlin/ru/quipy/config/EventSourcingLibConfiguration.kt b/src/main/kotlin/ru/quipy/config/EventSourcingLibConfiguration.kt index 9bcb80d07..be9861dc8 100644 --- a/src/main/kotlin/ru/quipy/config/EventSourcingLibConfiguration.kt +++ b/src/main/kotlin/ru/quipy/config/EventSourcingLibConfiguration.kt @@ -71,7 +71,8 @@ class EventSourcingLibConfiguration { val jettyServletWebServerFactory = JettyServletWebServerFactory() val c = JettyServerCustomizer { - (it.connectors[0].getConnectionFactory("h2c") as HTTP2CServerConnectionFactory).maxConcurrentStreams = 10_000_000 + (it.connectors[0].getConnectionFactory("h2c") as HTTP2CServerConnectionFactory).maxConcurrentStreams = + 10_000_000 } jettyServletWebServerFactory.serverCustomizers.add(c) diff --git a/src/main/kotlin/ru/quipy/config/RpcControlConfig.kt b/src/main/kotlin/ru/quipy/config/RpcControlConfig.kt new file mode 100644 index 000000000..805399a17 --- /dev/null +++ b/src/main/kotlin/ru/quipy/config/RpcControlConfig.kt @@ -0,0 +1,26 @@ +package ru.quipy.config + +import org.springframework.beans.factory.annotation.Qualifier +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import ru.quipy.common.utils.SlidingWindowRateLimiter +import ru.quipy.payments.logic.PaymentAccountProperties +import java.time.Duration +import java.util.concurrent.Semaphore + +@Configuration +class RpcControlConfig { + + @Bean + fun getSlidingWindowRateLimiter(accountProperties: PaymentAccountProperties) = + SlidingWindowRateLimiter( + accountProperties.rateLimitPerSec.toLong(), + Duration.ofSeconds(1) + ) + + @Bean + @Qualifier("parallelLimiter") + fun parallelLimiter(accountProperties: PaymentAccountProperties): Semaphore { + return Semaphore(accountProperties.parallelRequests) + } +} diff --git a/src/main/kotlin/ru/quipy/orders/subscribers/PaymentSubscriber.kt b/src/main/kotlin/ru/quipy/orders/subscribers/PaymentSubscriber.kt index 767f23b3e..d5d35985d 100644 --- a/src/main/kotlin/ru/quipy/orders/subscribers/PaymentSubscriber.kt +++ b/src/main/kotlin/ru/quipy/orders/subscribers/PaymentSubscriber.kt @@ -1,5 +1,6 @@ package ru.quipy.orders.subscribers +import io.micrometer.core.instrument.Metrics import jakarta.annotation.PostConstruct import org.slf4j.Logger import org.slf4j.LoggerFactory @@ -19,6 +20,7 @@ class PaymentSubscriber { val logger: Logger = LoggerFactory.getLogger(PaymentSubscriber::class.java) + val paymentSucceededCounter = Metrics.counter("succeeded.payments", "account", "acc-5") @Autowired lateinit var subscriptionsManager: AggregateSubscriptionsManager @@ -42,6 +44,7 @@ class PaymentSubscriber { ).toSeconds() }, spent in queue: ${event.spentInQueueDuration.toSeconds()}" ) + paymentSucceededCounter.increment() } } } diff --git a/src/main/kotlin/ru/quipy/payments/config/PaymentAccountsConfig.kt b/src/main/kotlin/ru/quipy/payments/config/PaymentAccountsConfig.kt index eceb90cff..2842a405f 100644 --- a/src/main/kotlin/ru/quipy/payments/config/PaymentAccountsConfig.kt +++ b/src/main/kotlin/ru/quipy/payments/config/PaymentAccountsConfig.kt @@ -8,7 +8,10 @@ import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import ru.quipy.core.EventSourcingService import ru.quipy.payments.api.PaymentAggregate -import ru.quipy.payments.logic.* +import ru.quipy.payments.logic.PaymentAccountProperties +import ru.quipy.payments.logic.PaymentAggregateState +import ru.quipy.payments.logic.PaymentExternalSystemAdapter +import ru.quipy.payments.logic.PaymentExternalSystemAdapterImpl import java.net.URI import java.net.http.HttpClient import java.net.http.HttpRequest @@ -61,4 +64,10 @@ class PaymentAccountsConfig { ) } } + + @Bean + fun getAccountProperties(accountAdapters: List): PaymentAccountProperties { + return accountAdapters.firstOrNull()?.getAccountProperties() + ?: throw IllegalStateException("No payment accounts configured") + } } \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt index a5909b85b..c30a5d160 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt @@ -1,55 +1,88 @@ package ru.quipy.payments.logic +import io.micrometer.core.instrument.Counter +import io.micrometer.core.instrument.Metrics import org.slf4j.Logger import org.slf4j.LoggerFactory -import org.springframework.beans.factory.annotation.Autowired +import org.springframework.beans.factory.annotation.Qualifier import org.springframework.stereotype.Service import ru.quipy.common.utils.CallerBlockingRejectedExecutionHandler import ru.quipy.common.utils.NamedThreadFactory +import ru.quipy.common.utils.SlidingWindowRateLimiter import ru.quipy.core.EventSourcingService import ru.quipy.payments.api.PaymentAggregate +import java.time.Duration import java.util.* import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.Semaphore import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.TimeUnit +import kotlin.random.Random @Service -class OrderPayer { +class OrderPayer( + private val paymentESService: EventSourcingService, + private val paymentService: PaymentService, + private val accountProperties: PaymentAccountProperties, + @field:Qualifier("parallelLimiter") + private val parallelLimiter: Semaphore, +) { + + private val paymentProcessingPlannedCounter: Counter = Metrics.counter("payment.processing.planned", "accountName", accountProperties.accountName) + private val paymentProcessingStartedCounter: Counter = Metrics.counter("payment.processing.started", "accountName", accountProperties.accountName) + private val paymentProcessingCompletedCounter: Counter = Metrics.counter("payment.processing.completed", "accountName", accountProperties.accountName) companion object { val logger: Logger = LoggerFactory.getLogger(OrderPayer::class.java) } - @Autowired - private lateinit var paymentESService: EventSourcingService - - @Autowired - private lateinit var paymentService: PaymentService + private val paymentExecutor: ThreadPoolExecutor by lazy { + ThreadPoolExecutor( + accountProperties.parallelRequests, + accountProperties.parallelRequests, + 0L, + TimeUnit.MILLISECONDS, + LinkedBlockingQueue(accountProperties.parallelRequests * 10), + NamedThreadFactory("payment-submission-executor"), + CallerBlockingRejectedExecutionHandler() + ) + } - private val paymentExecutor = ThreadPoolExecutor( - 16, - 16, - 0L, - TimeUnit.MILLISECONDS, - LinkedBlockingQueue(8_000), - NamedThreadFactory("payment-submission-executor"), - CallerBlockingRejectedExecutionHandler() - ) + private val rateLimit: SlidingWindowRateLimiter by lazy { + SlidingWindowRateLimiter( + rate = accountProperties.rateLimitPerSec.toLong(), + window = Duration.ofSeconds(1), + ) + } fun processPayment(orderId: UUID, amount: Int, paymentId: UUID, deadline: Long): Long { val createdAt = System.currentTimeMillis() - paymentExecutor.submit { - val createdEvent = paymentESService.create { - it.create( - paymentId, - orderId, - amount - ) + + paymentProcessingPlannedCounter.increment() + parallelLimiter.acquire() + + return try { + while (!rateLimit.tick()) { + Thread.sleep(Random.nextLong(0, 100)) } - logger.trace("Payment ${createdEvent.paymentId} for order $orderId created.") - paymentService.submitPaymentRequest(paymentId, amount, createdAt, deadline) + paymentExecutor.submit { + paymentProcessingStartedCounter.increment() + try { + val createdEvent = paymentESService.create { + it.create(paymentId, orderId, amount) + } + logger.trace("Payment {} for order {} created.", createdEvent.paymentId, orderId) + paymentService.submitPaymentRequest(paymentId, amount, createdAt, deadline) + } finally { + parallelLimiter.release() + paymentProcessingCompletedCounter.increment() + } + } + createdAt + } catch (e: Exception) { + parallelLimiter.release() + throw e } - return createdAt } } \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentAggregateCommands.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentAggregateCommands.kt index 2389e3cb3..8f0d735fd 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentAggregateCommands.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentAggregateCommands.kt @@ -15,7 +15,12 @@ fun PaymentAggregateState.create(id: UUID, orderId: UUID, amount: Int): PaymentC ) } -fun PaymentAggregateState.logSubmission(success: Boolean, transactionId: UUID, startedAt: Long, spentInQueueDuration: Duration): PaymentSubmittedEvent { +fun PaymentAggregateState.logSubmission( + success: Boolean, + transactionId: UUID, + startedAt: Long, + spentInQueueDuration: Duration +): PaymentSubmittedEvent { return PaymentSubmittedEvent( this.getId(), success, this.orderId, transactionId, startedAt, spentInQueueDuration ) @@ -28,9 +33,18 @@ fun PaymentAggregateState.logProcessing( reason: String? = null ): PaymentProcessedEvent { val submittedAt = this.submissions[transactionId ?: UUID.randomUUID()]?.timeStarted ?: 0 - val spentInQueueDuration = this.submissions[transactionId ?: UUID.randomUUID()]?.spentInQueue ?: Duration.ofMillis(0) + val spentInQueueDuration = + this.submissions[transactionId ?: UUID.randomUUID()]?.spentInQueue ?: Duration.ofMillis(0) return PaymentProcessedEvent( - this.getId(), success, this.orderId, submittedAt, processedAt, this.amount!!, transactionId, reason, spentInQueueDuration + this.getId(), + success, + this.orderId, + submittedAt, + processedAt, + this.amount!!, + transactionId, + reason, + spentInQueueDuration ) } \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentAggregateState.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentAggregateState.kt index ec0dd3709..ab8fcefd9 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentAggregateState.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentAggregateState.kt @@ -34,7 +34,8 @@ class PaymentAggregateState : AggregateState { @StateTransitionFunc fun paymentSubmittedApply(event: PaymentSubmittedEvent) { - submissions[event.transactionId] = PaymentSubmission(event.startedAt, event.transactionId, event.success, event.spentInQueueDuration) + submissions[event.transactionId] = + PaymentSubmission(event.startedAt, event.transactionId, event.success, event.spentInQueueDuration) updatedAt = createdAt } diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt index 5cb12106a..c0f4f84f5 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt @@ -36,6 +36,10 @@ class PaymentExternalSystemAdapterImpl( private val client = OkHttpClient.Builder().build() + override fun getAccountProperties(): PaymentAccountProperties { + return properties + } + override fun performPaymentAsync(paymentId: UUID, amount: Int, paymentStartedAt: Long, deadline: Long) { logger.warn("[$accountName] Submitting payment request for payment $paymentId") @@ -60,7 +64,7 @@ class PaymentExternalSystemAdapterImpl( mapper.readValue(response.body?.string(), ExternalSysResponse::class.java) } catch (e: Exception) { logger.error("[$accountName] [ERROR] Payment processed for txId: $transactionId, payment: $paymentId, result code: ${response.code}, reason: ${response.body?.string()}") - ExternalSysResponse(transactionId.toString(), paymentId.toString(),false, e.message) + ExternalSysResponse(transactionId.toString(), paymentId.toString(), false, e.message) } logger.warn("[$accountName] Payment processed for txId: $transactionId, payment: $paymentId, succeeded: ${body.result}, message: ${body.message}") diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentService.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentService.kt index 255db77dd..2907c7706 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentService.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentService.kt @@ -17,6 +17,8 @@ interface PaymentService { */ interface PaymentExternalSystemAdapter { + fun getAccountProperties(): PaymentAccountProperties + fun performPaymentAsync(paymentId: UUID, amount: Int, paymentStartedAt: Long, deadline: Long) fun name(): String @@ -32,13 +34,21 @@ interface PaymentExternalSystemAdapter { data class PaymentAccountProperties( val serviceName: String, val accountName: String, - val parallelRequests: Int, - val rateLimitPerSec: Int, - val price: Int, + val parallelRequests: Int, // 30 + val rateLimitPerSec: Int, // 10 + val price: Int, // 30 val averageProcessingTime: Duration = Duration.ofSeconds(11), val enabled: Boolean, ) +/* +#- parallelRequests=5 - означает, что провайдер разрешает вам в любой момент времени иметь не более 5 одновременных запросов от вас к нему для этого аккаунта +#- rateLimitPerSec=5 - означает, что провайдер разрешает вам каждую секунду отправлять к нему не более 5 запросов по этому аккаунту +#- price=30 - означает, что провайдер оплаты будет взымать за каждый успешный или неуспешный вызов 30 денежных единиц с вашего магазина. +#- averageProcessingTime=PTO.05S - провайдер оплаты сообщает вам, что в среднем время обработки одного запроса по этому аккаунту будет составлять около 50ms. + + */ + /** * Describes response from external service. */ diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentServiceImpl.kt index 1c24e5a72..7aad9b847 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentServiceImpl.kt @@ -1,16 +1,8 @@ package ru.quipy.payments.logic import org.slf4j.LoggerFactory -import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service -import ru.quipy.common.utils.NamedThreadFactory -import ru.quipy.core.EventSourcingService -import ru.quipy.payments.api.PaymentAggregate -import java.time.Duration import java.util.* -import java.util.concurrent.Executors -import java.util.concurrent.locks.ReentrantLock -import kotlin.concurrent.withLock @Service diff --git a/src/main/kotlin/ru/quipy/payments/subscribers/PaymentTransactionsSubscriber.kt b/src/main/kotlin/ru/quipy/payments/subscribers/PaymentTransactionsSubscriber.kt index 033b8a7b3..e4d8a4eaf 100644 --- a/src/main/kotlin/ru/quipy/payments/subscribers/PaymentTransactionsSubscriber.kt +++ b/src/main/kotlin/ru/quipy/payments/subscribers/PaymentTransactionsSubscriber.kt @@ -1,5 +1,6 @@ package ru.quipy.payments.subscribers +import jakarta.annotation.PostConstruct import org.slf4j.Logger import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Autowired @@ -12,7 +13,6 @@ import ru.quipy.streams.annotation.RetryFailedStrategy import java.util.* import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.CopyOnWriteArrayList -import jakarta.annotation.PostConstruct @Service class PaymentTransactionsSubscriber { diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 33d51a58b..06f5f41a1 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -2,29 +2,26 @@ server.address=0.0.0.0 server.port=8081 server.http2.enabled=true spring.main.allow-bean-definition-overriding=true - # MongoDB properties spring.data.mongodb.host=localhost spring.data.mongodb.port=27017 spring.data.mongodb.database=online-shop - # Tiny event sourcing library properties event.sourcing.auto-scan-enabled=true event.sourcing.scan-package=ru.quipy event.sourcing.snapshots-enabled=false event.sourcing.sagas-enabled=false - # Postgres event store properties spring.datasource.hikari.jdbc-url=jdbc:postgresql://${POSTGRES_ADDRESS:localhost}:${POSTGRES_PORT:65432}/postgres spring.datasource.hikari.username=tiny_es spring.datasource.hikari.password=tiny_es spring.datasource.hikari.leak-detection-threshold=2000 - management.metrics.web.server.request.autotime.percentiles=0.95 management.metrics.export.prometheus.enabled=true management.endpoints.web.exposure.include=info,health,prometheus,metrics - payment.service-name=${PAYMENT_SERVICE_NAME} payment.token=${PAYMENT_TOKEN} -payment.accounts=${PAYMENT_ACCOUNTS:acc-12,acc-20} +# payment.accounts=${PAYMENT_ACCOUNTS:acc-12,acc-20} +# payment.accounts=${PAYMENT_ACCOUNTS:acc-3} +payment.accounts=${PAYMENT_ACCOUNTS:acc-5} payment.hostPort=${PAYMENT_HOST:localhost}:${PAYMENT_PORT:1234} \ No newline at end of file diff --git a/test-local-run.http b/test-local-run.http index 7be0e4f73..cabd61099 100644 --- a/test-local-run.http +++ b/test-local-run.http @@ -5,11 +5,31 @@ Content-Type: application/json { "serviceName": "{{serviceName}}", "token": "{{token}}", - "ratePerSecond": 1, + "ratePerSecond": 2, "testCount": 100, - "processingTimeMillis": 80000 + "processingTimeMillis": 60000 } ### Stop running test to save time and resources # @timeout 120 -POST http://localhost:1234/test/stop/{{serviceName}} \ No newline at end of file +POST http://localhost:1234/test/stop/{{serviceName}} + +# PaymentAccountProperties( +# serviceName=cas-m3404-05, +# accountName=acc-3, +# parallelRequests=30, +# rateLimitPerSec=10, +# price=30, +# averageProcessingTime=PT1S, +# enabled=true +# ) + +# PaymentAccountProperties( +# serviceName=cas-m3404-05, +# accountName=acc-5, +# parallelRequests=5, +# rateLimitPerSec=3, +# price=30, +# averageProcessingTime=PT4.9S, +# enabled=true +#)