From 9c76cb167fb36ef9bc41d04b51db2c0c9a6d63fe Mon Sep 17 00:00:00 2001 From: kuro Date: Thu, 2 Oct 2025 15:40:37 +0300 Subject: [PATCH 1/9] HW-3 is Start. --- src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt index 8c872d6de..efac11f4d 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt @@ -57,7 +57,7 @@ class OrderPayer { ) } - private val parallelLimiter = Semaphore(5) //Good men!!! + private val parallelLimiter = Semaphore(5) suspend fun processPayment(orderId: UUID, amount: Int, paymentId: UUID, deadline: Long): Long { val createdAt = System.currentTimeMillis() From 58e3c1414477d6dade124907f016a2948b0aa31e Mon Sep 17 00:00:00 2001 From: vaycheslav Date: Thu, 2 Oct 2025 19:29:04 +0300 Subject: [PATCH 2/9] refactoring: refactored di in spring --- .../ru/quipy/apigateway/APIController.kt | 8 ++--- .../ru/quipy/config/RpcControlConfig.kt | 26 +++++++++++++++ .../payments/config/PaymentAccountsConfig.kt | 6 ++++ .../ru/quipy/payments/logic/OrderPayer.kt | 32 +++++++------------ 4 files changed, 47 insertions(+), 25 deletions(-) create mode 100644 src/main/kotlin/ru/quipy/config/RpcControlConfig.kt diff --git a/src/main/kotlin/ru/quipy/apigateway/APIController.kt b/src/main/kotlin/ru/quipy/apigateway/APIController.kt index 2bdf631a6..ac378d041 100644 --- a/src/main/kotlin/ru/quipy/apigateway/APIController.kt +++ b/src/main/kotlin/ru/quipy/apigateway/APIController.kt @@ -9,16 +9,16 @@ import ru.quipy.payments.logic.OrderPayer import java.util.* @RestController -class APIController { +class APIController(orderRepository: OrderRepository, orderPayer: OrderPayer){ val logger: Logger = LoggerFactory.getLogger(APIController::class.java) - @Autowired private lateinit var orderRepository: OrderRepository - @Autowired private lateinit var orderPayer: OrderPayer + + @PostMapping("/users") fun createUser(@RequestBody req: CreateUserRequest): User { return User(UUID.randomUUID(), req.name) @@ -55,7 +55,7 @@ class APIController { } @PostMapping("/orders/{orderId}/payment") - suspend fun payOrder(@PathVariable orderId: UUID, @RequestParam deadline: Long): PaymentSubmissionDto { + fun payOrder(@PathVariable orderId: UUID, @RequestParam deadline: Long): PaymentSubmissionDto { val paymentId = UUID.randomUUID() val order = orderRepository.findById(orderId)?.let { orderRepository.save(it.copy(status = OrderStatus.PAYMENT_IN_PROGRESS)) diff --git a/src/main/kotlin/ru/quipy/config/RpcControlConfig.kt b/src/main/kotlin/ru/quipy/config/RpcControlConfig.kt new file mode 100644 index 000000000..805399a17 --- /dev/null +++ b/src/main/kotlin/ru/quipy/config/RpcControlConfig.kt @@ -0,0 +1,26 @@ +package ru.quipy.config + +import org.springframework.beans.factory.annotation.Qualifier +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import ru.quipy.common.utils.SlidingWindowRateLimiter +import ru.quipy.payments.logic.PaymentAccountProperties +import java.time.Duration +import java.util.concurrent.Semaphore + +@Configuration +class RpcControlConfig { + + @Bean + fun getSlidingWindowRateLimiter(accountProperties: PaymentAccountProperties) = + SlidingWindowRateLimiter( + accountProperties.rateLimitPerSec.toLong(), + Duration.ofSeconds(1) + ) + + @Bean + @Qualifier("parallelLimiter") + fun parallelLimiter(accountProperties: PaymentAccountProperties): Semaphore { + return Semaphore(accountProperties.parallelRequests) + } +} diff --git a/src/main/kotlin/ru/quipy/payments/config/PaymentAccountsConfig.kt b/src/main/kotlin/ru/quipy/payments/config/PaymentAccountsConfig.kt index eceb90cff..33da0ce78 100644 --- a/src/main/kotlin/ru/quipy/payments/config/PaymentAccountsConfig.kt +++ b/src/main/kotlin/ru/quipy/payments/config/PaymentAccountsConfig.kt @@ -61,4 +61,10 @@ class PaymentAccountsConfig { ) } } + + @Bean + fun getAccountProperties(accountAdapters: List): PaymentAccountProperties { + return accountAdapters.firstOrNull()?.getAccountProperties() + ?: throw IllegalStateException("No payment accounts configured") + } } \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt index efac11f4d..9fcc1c49b 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt @@ -1,9 +1,8 @@ package ru.quipy.payments.logic -import kotlinx.coroutines.delay import org.slf4j.Logger import org.slf4j.LoggerFactory -import org.springframework.beans.factory.annotation.Autowired +import org.springframework.beans.factory.annotation.Qualifier import org.springframework.stereotype.Service import ru.quipy.common.utils.CallerBlockingRejectedExecutionHandler import ru.quipy.common.utils.NamedThreadFactory @@ -16,28 +15,21 @@ import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.Semaphore import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.TimeUnit +import kotlin.random.Random @Service -class OrderPayer { +class OrderPayer( + private val paymentESService: EventSourcingService, + private val paymentService: PaymentService, + private val accountProperties: PaymentAccountProperties, + @field:Qualifier("parallelLimiter") + private val parallelLimiter: Semaphore +) { companion object { val logger: Logger = LoggerFactory.getLogger(OrderPayer::class.java) } - @Autowired - private lateinit var paymentESService: EventSourcingService - - @Autowired - private lateinit var paymentService: PaymentService - - @Autowired - private lateinit var accountAdapters: List - - private val accountProperties: PaymentAccountProperties by lazy { - accountAdapters.firstOrNull()?.getAccountProperties() - ?: throw IllegalStateException("No payment accounts configured") - } - private val paymentExecutor: ThreadPoolExecutor by lazy { ThreadPoolExecutor( accountProperties.parallelRequests, @@ -57,16 +49,14 @@ class OrderPayer { ) } - private val parallelLimiter = Semaphore(5) - - suspend fun processPayment(orderId: UUID, amount: Int, paymentId: UUID, deadline: Long): Long { + fun processPayment(orderId: UUID, amount: Int, paymentId: UUID, deadline: Long): Long { val createdAt = System.currentTimeMillis() parallelLimiter.acquire() return try { while (!rateLimit.tick()) { - delay(100) + Thread.sleep(Random.nextLong(0, 100)) } paymentExecutor.submit { From 68b9ff63b07db03faaaf6c4423dd47b7b300150a Mon Sep 17 00:00:00 2001 From: vaycheslav Date: Thu, 2 Oct 2025 19:39:49 +0300 Subject: [PATCH 3/9] refactoring: fixed wildcard imports and reformate code --- README.md | 15 ++++++++++--- pom.xml | 22 +++++++++---------- prometheus/prometheus.yml | 8 +++---- .../ru/quipy/apigateway/APIController.kt | 4 +--- .../common/utils/FixedWindowRateLimiter.kt | 12 ++++++---- .../common/utils/SlidingWindowRateLimiter.kt | 3 +-- .../common/utils/TokenBucketRateLimiter.kt | 2 +- .../config/EventSourcingLibConfiguration.kt | 3 ++- .../payments/config/PaymentAccountsConfig.kt | 5 ++++- .../logic/PaymentAggregateCommands.kt | 20 ++++++++++++++--- .../payments/logic/PaymentAggregateState.kt | 3 ++- .../logic/PaymentExternalServiceImpl.kt | 2 +- .../payments/logic/PaymentServiceImpl.kt | 8 ------- .../PaymentTransactionsSubscriber.kt | 2 +- src/main/resources/application.properties | 5 ----- 15 files changed, 65 insertions(+), 49 deletions(-) diff --git a/README.md b/README.md index 56d36970f..bc19c4280 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,9 @@ # Template for the HighLoad course + This project is based on [Tiny Event Sourcing library](https://github.com/andrsuh/tiny-event-sourcing) ### Run PostgreSql + This example uses Postgres as an implementation of the Event store. You can see it in `pom.xml`: ``` @@ -12,7 +14,8 @@ This example uses Postgres as an implementation of the Event store. You can see ``` -Thus, you have to run Postgres in order to test this example. Postgres service is included in `docker-compose` file that we have in the root of the project. +Thus, you have to run Postgres in order to test this example. Postgres service is included in `docker-compose` file +that we have in the root of the project. # More comprehensive information about the course, project, how to run tests is here: @@ -21,19 +24,25 @@ https://andrsuh.notion.site/2595d535059281d8a815c2cb3875c376?source=copy_link https://andrsuh.notion.site/2625d5350592801aaf88c7c95302d10c?source=copy_link ### Run the infrastructure + Set of the services you need to start developing and testing process is following: -- Bombardier - service that is in charge of emulation the store's clients activity (creates the incoming load). Also serves as a third-party payment system. + +- Bombardier - service that is in charge of emulation the store's clients activity (creates the incoming load). Also + serves as a third-party payment system. - Postgres DBMS - Prometheus + Grafana - metrics collection and visualization services You can run all beforementioned services by the following command: + ``` docker compose -f docker-compose.yml up ``` ### Run the application -To make the application run you can start the main class `OnlineShopApplication`. It is not being launched as a docker contained to simplify and speed up the devevopment process as it is easier for you to refactor the application and re-run it immediately in the IDE. +To make the application run you can start the main class `OnlineShopApplication`. It is not being launched as a docker +contained to simplify and speed up the devevopment process as it is easier for you to refactor the application and +re-run it immediately in the IDE. ### If you want to pull changes from the main repository into your fork diff --git a/pom.xml b/pom.xml index 5724b2568..e244cb6aa 100644 --- a/pom.xml +++ b/pom.xml @@ -1,5 +1,5 @@ - 4.0.0 @@ -15,7 +15,7 @@ OnlineShop Application for resilience and highly-loaded applications course - + 2.2.0 1.9.0 4.12.0 @@ -25,7 +25,7 @@ 3.1.8 - + io.github.resilience4j resilience4j-ratelimiter @@ -67,15 +67,15 @@ ${jetty.version} - - - - + + + + - - - - + + + + com.fasterxml.jackson.module diff --git a/prometheus/prometheus.yml b/prometheus/prometheus.yml index 936c7edd9..ce045546f 100644 --- a/prometheus/prometheus.yml +++ b/prometheus/prometheus.yml @@ -6,16 +6,16 @@ scrape_configs: - job_name: 'bombardier-docker-network-job' metrics_path: '/actuator/prometheus' static_configs: - - targets: ['bombardier:1234'] + - targets: [ 'bombardier:1234' ] - job_name: 'bombardier-host-job' metrics_path: '/actuator/prometheus' static_configs: - - targets: ['host.docker.internal:1234'] + - targets: [ 'host.docker.internal:1234' ] - job_name: 'online-store-job' metrics_path: '/actuator/prometheus' static_configs: - - targets: ['host.docker.internal:8081'] + - targets: [ 'host.docker.internal:8081' ] - job_name: 'online-shop-job' metrics_path: '/actuator/prometheus' static_configs: - - targets: ['host.docker.internal:18081'] \ No newline at end of file + - targets: [ 'host.docker.internal:18081' ] \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/apigateway/APIController.kt b/src/main/kotlin/ru/quipy/apigateway/APIController.kt index ac378d041..807ae054a 100644 --- a/src/main/kotlin/ru/quipy/apigateway/APIController.kt +++ b/src/main/kotlin/ru/quipy/apigateway/APIController.kt @@ -2,14 +2,13 @@ package ru.quipy.apigateway import org.slf4j.Logger import org.slf4j.LoggerFactory -import org.springframework.beans.factory.annotation.Autowired import org.springframework.web.bind.annotation.* import ru.quipy.orders.repository.OrderRepository import ru.quipy.payments.logic.OrderPayer import java.util.* @RestController -class APIController(orderRepository: OrderRepository, orderPayer: OrderPayer){ +class APIController(orderRepository: OrderRepository, orderPayer: OrderPayer) { val logger: Logger = LoggerFactory.getLogger(APIController::class.java) @@ -18,7 +17,6 @@ class APIController(orderRepository: OrderRepository, orderPayer: OrderPayer){ private lateinit var orderPayer: OrderPayer - @PostMapping("/users") fun createUser(@RequestBody req: CreateUserRequest): User { return User(UUID.randomUUID(), req.name) diff --git a/src/main/kotlin/ru/quipy/common/utils/FixedWindowRateLimiter.kt b/src/main/kotlin/ru/quipy/common/utils/FixedWindowRateLimiter.kt index 15920bd9c..768c71367 100644 --- a/src/main/kotlin/ru/quipy/common/utils/FixedWindowRateLimiter.kt +++ b/src/main/kotlin/ru/quipy/common/utils/FixedWindowRateLimiter.kt @@ -19,7 +19,7 @@ class FixedWindowRateLimiter( private val rate: Int, private val window: Long, private val timeUnit: TimeUnit = TimeUnit.MINUTES, -): RateLimiter { +) : RateLimiter { companion object { private val logger: Logger = LoggerFactory.getLogger(FixedWindowRateLimiter::class.java) private val counter = AtomicInteger(0) @@ -59,7 +59,7 @@ class SlowStartRateLimiter( private val targetRate: Int, private val timeUnit: TimeUnit = TimeUnit.MINUTES, private val slowStartOn: Boolean = true, -): RateLimiter { +) : RateLimiter { companion object { private val logger: Logger = LoggerFactory.getLogger(SlowStartRateLimiter::class.java) private val counter = AtomicInteger(0) @@ -108,7 +108,7 @@ class CountingRateLimiter( private val rate: Int, private val window: Long, private val timeUnit: TimeUnit = TimeUnit.SECONDS -): RateLimiter { +) : RateLimiter { companion object { private val logger: Logger = LoggerFactory.getLogger(CountingRateLimiter::class.java) } @@ -137,7 +137,11 @@ class CountingRateLimiter( ) } -fun makeRateLimiter(accountName: String, rate: Int, timeUnit: TimeUnit = TimeUnit.SECONDS): io.github.resilience4j.ratelimiter.RateLimiter { +fun makeRateLimiter( + accountName: String, + rate: Int, + timeUnit: TimeUnit = TimeUnit.SECONDS +): io.github.resilience4j.ratelimiter.RateLimiter { val config = RateLimiterConfig.custom() .limitRefreshPeriod(if (timeUnit == TimeUnit.SECONDS) Duration.ofSeconds(1) else Duration.ofMinutes(1)) .limitForPeriod(rate) diff --git a/src/main/kotlin/ru/quipy/common/utils/SlidingWindowRateLimiter.kt b/src/main/kotlin/ru/quipy/common/utils/SlidingWindowRateLimiter.kt index 6ff3092ab..1d86848e2 100644 --- a/src/main/kotlin/ru/quipy/common/utils/SlidingWindowRateLimiter.kt +++ b/src/main/kotlin/ru/quipy/common/utils/SlidingWindowRateLimiter.kt @@ -10,8 +10,6 @@ import java.time.Duration import java.util.concurrent.Executors import java.util.concurrent.PriorityBlockingQueue import java.util.concurrent.atomic.AtomicLong -import java.util.concurrent.locks.ReentrantLock -import kotlin.concurrent.withLock class SlidingWindowRateLimiter( private val rate: Long, @@ -64,6 +62,7 @@ class SlidingWindowRateLimiter( queue.take() } }.invokeOnCompletion { th -> if (th != null) logger.error("Rate limiter release job completed", th) } + companion object { private val logger: Logger = LoggerFactory.getLogger(SlidingWindowRateLimiter::class.java) } diff --git a/src/main/kotlin/ru/quipy/common/utils/TokenBucketRateLimiter.kt b/src/main/kotlin/ru/quipy/common/utils/TokenBucketRateLimiter.kt index a7c9547d7..a0791607f 100644 --- a/src/main/kotlin/ru/quipy/common/utils/TokenBucketRateLimiter.kt +++ b/src/main/kotlin/ru/quipy/common/utils/TokenBucketRateLimiter.kt @@ -15,7 +15,7 @@ class TokenBucketRateLimiter( private val bucketMaxCapacity: Int, private val window: Long, private val timeUnit: TimeUnit = TimeUnit.MINUTES, -): RateLimiter { +) : RateLimiter { companion object { private val logger: Logger = LoggerFactory.getLogger(TokenBucketRateLimiter::class.java) } diff --git a/src/main/kotlin/ru/quipy/config/EventSourcingLibConfiguration.kt b/src/main/kotlin/ru/quipy/config/EventSourcingLibConfiguration.kt index 9bcb80d07..be9861dc8 100644 --- a/src/main/kotlin/ru/quipy/config/EventSourcingLibConfiguration.kt +++ b/src/main/kotlin/ru/quipy/config/EventSourcingLibConfiguration.kt @@ -71,7 +71,8 @@ class EventSourcingLibConfiguration { val jettyServletWebServerFactory = JettyServletWebServerFactory() val c = JettyServerCustomizer { - (it.connectors[0].getConnectionFactory("h2c") as HTTP2CServerConnectionFactory).maxConcurrentStreams = 10_000_000 + (it.connectors[0].getConnectionFactory("h2c") as HTTP2CServerConnectionFactory).maxConcurrentStreams = + 10_000_000 } jettyServletWebServerFactory.serverCustomizers.add(c) diff --git a/src/main/kotlin/ru/quipy/payments/config/PaymentAccountsConfig.kt b/src/main/kotlin/ru/quipy/payments/config/PaymentAccountsConfig.kt index 33da0ce78..2842a405f 100644 --- a/src/main/kotlin/ru/quipy/payments/config/PaymentAccountsConfig.kt +++ b/src/main/kotlin/ru/quipy/payments/config/PaymentAccountsConfig.kt @@ -8,7 +8,10 @@ import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import ru.quipy.core.EventSourcingService import ru.quipy.payments.api.PaymentAggregate -import ru.quipy.payments.logic.* +import ru.quipy.payments.logic.PaymentAccountProperties +import ru.quipy.payments.logic.PaymentAggregateState +import ru.quipy.payments.logic.PaymentExternalSystemAdapter +import ru.quipy.payments.logic.PaymentExternalSystemAdapterImpl import java.net.URI import java.net.http.HttpClient import java.net.http.HttpRequest diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentAggregateCommands.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentAggregateCommands.kt index 2389e3cb3..8f0d735fd 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentAggregateCommands.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentAggregateCommands.kt @@ -15,7 +15,12 @@ fun PaymentAggregateState.create(id: UUID, orderId: UUID, amount: Int): PaymentC ) } -fun PaymentAggregateState.logSubmission(success: Boolean, transactionId: UUID, startedAt: Long, spentInQueueDuration: Duration): PaymentSubmittedEvent { +fun PaymentAggregateState.logSubmission( + success: Boolean, + transactionId: UUID, + startedAt: Long, + spentInQueueDuration: Duration +): PaymentSubmittedEvent { return PaymentSubmittedEvent( this.getId(), success, this.orderId, transactionId, startedAt, spentInQueueDuration ) @@ -28,9 +33,18 @@ fun PaymentAggregateState.logProcessing( reason: String? = null ): PaymentProcessedEvent { val submittedAt = this.submissions[transactionId ?: UUID.randomUUID()]?.timeStarted ?: 0 - val spentInQueueDuration = this.submissions[transactionId ?: UUID.randomUUID()]?.spentInQueue ?: Duration.ofMillis(0) + val spentInQueueDuration = + this.submissions[transactionId ?: UUID.randomUUID()]?.spentInQueue ?: Duration.ofMillis(0) return PaymentProcessedEvent( - this.getId(), success, this.orderId, submittedAt, processedAt, this.amount!!, transactionId, reason, spentInQueueDuration + this.getId(), + success, + this.orderId, + submittedAt, + processedAt, + this.amount!!, + transactionId, + reason, + spentInQueueDuration ) } \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentAggregateState.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentAggregateState.kt index ec0dd3709..ab8fcefd9 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentAggregateState.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentAggregateState.kt @@ -34,7 +34,8 @@ class PaymentAggregateState : AggregateState { @StateTransitionFunc fun paymentSubmittedApply(event: PaymentSubmittedEvent) { - submissions[event.transactionId] = PaymentSubmission(event.startedAt, event.transactionId, event.success, event.spentInQueueDuration) + submissions[event.transactionId] = + PaymentSubmission(event.startedAt, event.transactionId, event.success, event.spentInQueueDuration) updatedAt = createdAt } diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt index 905b3088b..c0f4f84f5 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt @@ -64,7 +64,7 @@ class PaymentExternalSystemAdapterImpl( mapper.readValue(response.body?.string(), ExternalSysResponse::class.java) } catch (e: Exception) { logger.error("[$accountName] [ERROR] Payment processed for txId: $transactionId, payment: $paymentId, result code: ${response.code}, reason: ${response.body?.string()}") - ExternalSysResponse(transactionId.toString(), paymentId.toString(),false, e.message) + ExternalSysResponse(transactionId.toString(), paymentId.toString(), false, e.message) } logger.warn("[$accountName] Payment processed for txId: $transactionId, payment: $paymentId, succeeded: ${body.result}, message: ${body.message}") diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentServiceImpl.kt index 1c24e5a72..7aad9b847 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentServiceImpl.kt @@ -1,16 +1,8 @@ package ru.quipy.payments.logic import org.slf4j.LoggerFactory -import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service -import ru.quipy.common.utils.NamedThreadFactory -import ru.quipy.core.EventSourcingService -import ru.quipy.payments.api.PaymentAggregate -import java.time.Duration import java.util.* -import java.util.concurrent.Executors -import java.util.concurrent.locks.ReentrantLock -import kotlin.concurrent.withLock @Service diff --git a/src/main/kotlin/ru/quipy/payments/subscribers/PaymentTransactionsSubscriber.kt b/src/main/kotlin/ru/quipy/payments/subscribers/PaymentTransactionsSubscriber.kt index 033b8a7b3..e4d8a4eaf 100644 --- a/src/main/kotlin/ru/quipy/payments/subscribers/PaymentTransactionsSubscriber.kt +++ b/src/main/kotlin/ru/quipy/payments/subscribers/PaymentTransactionsSubscriber.kt @@ -1,5 +1,6 @@ package ru.quipy.payments.subscribers +import jakarta.annotation.PostConstruct import org.slf4j.Logger import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Autowired @@ -12,7 +13,6 @@ import ru.quipy.streams.annotation.RetryFailedStrategy import java.util.* import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.CopyOnWriteArrayList -import jakarta.annotation.PostConstruct @Service class PaymentTransactionsSubscriber { diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 4e3ebfbc4..06f5f41a1 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -2,28 +2,23 @@ server.address=0.0.0.0 server.port=8081 server.http2.enabled=true spring.main.allow-bean-definition-overriding=true - # MongoDB properties spring.data.mongodb.host=localhost spring.data.mongodb.port=27017 spring.data.mongodb.database=online-shop - # Tiny event sourcing library properties event.sourcing.auto-scan-enabled=true event.sourcing.scan-package=ru.quipy event.sourcing.snapshots-enabled=false event.sourcing.sagas-enabled=false - # Postgres event store properties spring.datasource.hikari.jdbc-url=jdbc:postgresql://${POSTGRES_ADDRESS:localhost}:${POSTGRES_PORT:65432}/postgres spring.datasource.hikari.username=tiny_es spring.datasource.hikari.password=tiny_es spring.datasource.hikari.leak-detection-threshold=2000 - management.metrics.web.server.request.autotime.percentiles=0.95 management.metrics.export.prometheus.enabled=true management.endpoints.web.exposure.include=info,health,prometheus,metrics - payment.service-name=${PAYMENT_SERVICE_NAME} payment.token=${PAYMENT_TOKEN} # payment.accounts=${PAYMENT_ACCOUNTS:acc-12,acc-20} From 74a87c3d7f56966b8100be3446cb9169fa4916f6 Mon Sep 17 00:00:00 2001 From: vaycheslav Date: Thu, 2 Oct 2025 20:12:07 +0300 Subject: [PATCH 4/9] hotfix: fixed bean resolution in constructor --- src/main/kotlin/ru/quipy/apigateway/APIController.kt | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/main/kotlin/ru/quipy/apigateway/APIController.kt b/src/main/kotlin/ru/quipy/apigateway/APIController.kt index 807ae054a..251b8cdbc 100644 --- a/src/main/kotlin/ru/quipy/apigateway/APIController.kt +++ b/src/main/kotlin/ru/quipy/apigateway/APIController.kt @@ -8,15 +8,10 @@ import ru.quipy.payments.logic.OrderPayer import java.util.* @RestController -class APIController(orderRepository: OrderRepository, orderPayer: OrderPayer) { +class APIController(private val orderRepository: OrderRepository, private val orderPayer: OrderPayer) { val logger: Logger = LoggerFactory.getLogger(APIController::class.java) - private lateinit var orderRepository: OrderRepository - - private lateinit var orderPayer: OrderPayer - - @PostMapping("/users") fun createUser(@RequestBody req: CreateUserRequest): User { return User(UUID.randomUUID(), req.name) From 86e0d3ce1fcff2d30e28cf4c10aa3463f483525e Mon Sep 17 00:00:00 2001 From: vaycheslav Date: Fri, 3 Oct 2025 19:24:08 +0300 Subject: [PATCH 5/9] feature: added counters --- .../ru/quipy/orders/subscribers/PaymentSubscriber.kt | 12 +++++++++++- .../kotlin/ru/quipy/payments/logic/OrderPayer.kt | 10 +++++++++- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/src/main/kotlin/ru/quipy/orders/subscribers/PaymentSubscriber.kt b/src/main/kotlin/ru/quipy/orders/subscribers/PaymentSubscriber.kt index 767f23b3e..dfcd32ef5 100644 --- a/src/main/kotlin/ru/quipy/orders/subscribers/PaymentSubscriber.kt +++ b/src/main/kotlin/ru/quipy/orders/subscribers/PaymentSubscriber.kt @@ -1,5 +1,7 @@ package ru.quipy.orders.subscribers +import io.micrometer.core.instrument.Counter +import io.micrometer.core.instrument.MeterRegistry import jakarta.annotation.PostConstruct import org.slf4j.Logger import org.slf4j.LoggerFactory @@ -15,10 +17,17 @@ import ru.quipy.streams.annotation.RetryFailedStrategy import java.time.Duration @Service -class PaymentSubscriber { +class PaymentSubscriber(private val meterRegistry: MeterRegistry) { val logger: Logger = LoggerFactory.getLogger(PaymentSubscriber::class.java) + val paymentSucceededCounter = Counter + .builder("payments_succeeded") + .tag("payment count", "time") + .register(meterRegistry) + + + @Autowired lateinit var subscriptionsManager: AggregateSubscriptionsManager @@ -42,6 +51,7 @@ class PaymentSubscriber { ).toSeconds() }, spent in queue: ${event.spentInQueueDuration.toSeconds()}" ) + paymentSucceededCounter.increment() } } } diff --git a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt index 9fcc1c49b..5927d7f51 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt @@ -1,5 +1,7 @@ package ru.quipy.payments.logic +import io.micrometer.core.instrument.Counter +import io.micrometer.core.instrument.MeterRegistry import org.slf4j.Logger import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Qualifier @@ -23,9 +25,13 @@ class OrderPayer( private val paymentService: PaymentService, private val accountProperties: PaymentAccountProperties, @field:Qualifier("parallelLimiter") - private val parallelLimiter: Semaphore + private val parallelLimiter: Semaphore, + private val meterRegistry: MeterRegistry ) { + private val counterInc: Counter = Counter.builder("queue_in").tag("count", "time").register(meterRegistry) + private val counterDec: Counter = Counter.builder("queue_out").tag("count", "time").register(meterRegistry) + companion object { val logger: Logger = LoggerFactory.getLogger(OrderPayer::class.java) } @@ -60,6 +66,7 @@ class OrderPayer( } paymentExecutor.submit { + counterInc.increment() try { val createdEvent = paymentESService.create { it.create(paymentId, orderId, amount) @@ -68,6 +75,7 @@ class OrderPayer( paymentService.submitPaymentRequest(paymentId, amount, createdAt, deadline) } finally { parallelLimiter.release() + counterDec.increment() } } createdAt From 3970682abf2fc1c5ef3b2e85f6efd2a7f0a81344 Mon Sep 17 00:00:00 2001 From: vaycheslav Date: Fri, 3 Oct 2025 19:42:34 +0300 Subject: [PATCH 6/9] feature: added counters --- .../ru/quipy/orders/subscribers/PaymentSubscriber.kt | 11 +++-------- src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt | 6 +++--- 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/src/main/kotlin/ru/quipy/orders/subscribers/PaymentSubscriber.kt b/src/main/kotlin/ru/quipy/orders/subscribers/PaymentSubscriber.kt index dfcd32ef5..6c3b6784a 100644 --- a/src/main/kotlin/ru/quipy/orders/subscribers/PaymentSubscriber.kt +++ b/src/main/kotlin/ru/quipy/orders/subscribers/PaymentSubscriber.kt @@ -2,6 +2,7 @@ package ru.quipy.orders.subscribers import io.micrometer.core.instrument.Counter import io.micrometer.core.instrument.MeterRegistry +import io.micrometer.core.instrument.Metrics import jakarta.annotation.PostConstruct import org.slf4j.Logger import org.slf4j.LoggerFactory @@ -17,17 +18,11 @@ import ru.quipy.streams.annotation.RetryFailedStrategy import java.time.Duration @Service -class PaymentSubscriber(private val meterRegistry: MeterRegistry) { +class PaymentSubscriber { val logger: Logger = LoggerFactory.getLogger(PaymentSubscriber::class.java) - val paymentSucceededCounter = Counter - .builder("payments_succeeded") - .tag("payment count", "time") - .register(meterRegistry) - - - + val paymentSucceededCounter = Metrics.counter("succeeded.payments", "account", "acc-5") @Autowired lateinit var subscriptionsManager: AggregateSubscriptionsManager diff --git a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt index 5927d7f51..e553b8237 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt @@ -2,6 +2,7 @@ package ru.quipy.payments.logic import io.micrometer.core.instrument.Counter import io.micrometer.core.instrument.MeterRegistry +import io.micrometer.core.instrument.Metrics import org.slf4j.Logger import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Qualifier @@ -26,11 +27,10 @@ class OrderPayer( private val accountProperties: PaymentAccountProperties, @field:Qualifier("parallelLimiter") private val parallelLimiter: Semaphore, - private val meterRegistry: MeterRegistry ) { - private val counterInc: Counter = Counter.builder("queue_in").tag("count", "time").register(meterRegistry) - private val counterDec: Counter = Counter.builder("queue_out").tag("count", "time").register(meterRegistry) + private val counterInc: Counter = Metrics.counter("queue.in", "account", "acc-5") + private val counterDec: Counter = Metrics.counter("queue.out", "account", "acc-5") companion object { val logger: Logger = LoggerFactory.getLogger(OrderPayer::class.java) From 21e6f81acb3b7e7d499a2e598141f5abed60c504 Mon Sep 17 00:00:00 2001 From: Ioann Boltonov Date: Fri, 10 Oct 2025 21:43:33 +0300 Subject: [PATCH 7/9] feat: new dashboards for active tasks --- .gitignore | Bin 674 -> 452 bytes .../dashboards/ServicesStatistic.json | 343 ++++++++++++++++++ .../ru/quipy/payments/logic/OrderPayer.kt | 9 +- 3 files changed, 347 insertions(+), 5 deletions(-) diff --git a/.gitignore b/.gitignore index 1c76414f13e4643f87012bada9b577d63355ef6d..db5578ab18f7ece42a8df67e603b3ebe451ce600 100644 GIT binary patch delta 7 OcmZ3)dW3ny5k>$E3 0, задачи накапливаются быстрее, чем обрабатываются", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": true, + "axisColorMode": "text", + "axisLabel": "Δ запросов/сек", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 30, + "gradientMode": "scheme", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "smooth", + "lineWidth": 2, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "area" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 0.1 + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 16 + }, + "id": 105, + "options": { + "legend": { + "calcs": [ + "mean", + "lastNotNull" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "none" + } + }, + "pluginVersion": "9.0.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "expr": "rate(payment_processing_started_total[1m]) - rate(payment_processing_completed_total[1m])", + "legendFormat": "{{accountName}}", + "refId": "A" + } + ], + "title": "Скорость накопления задач (критичный показатель)", + "type": "timeseries" + } + ], + "title": "Payment Processing - Active Tasks", + "type": "row" } ], "preload": false, diff --git a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt index e553b8237..3df99a066 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt @@ -1,7 +1,6 @@ package ru.quipy.payments.logic import io.micrometer.core.instrument.Counter -import io.micrometer.core.instrument.MeterRegistry import io.micrometer.core.instrument.Metrics import org.slf4j.Logger import org.slf4j.LoggerFactory @@ -29,8 +28,8 @@ class OrderPayer( private val parallelLimiter: Semaphore, ) { - private val counterInc: Counter = Metrics.counter("queue.in", "account", "acc-5") - private val counterDec: Counter = Metrics.counter("queue.out", "account", "acc-5") + private val paymentProcessingStartedCounter: Counter = Metrics.counter("payment.processing.started", "accountName", accountProperties.accountName) + private val paymentProcessingCompletedCounter: Counter = Metrics.counter("payment.processing.completed", "accountName", accountProperties.accountName) companion object { val logger: Logger = LoggerFactory.getLogger(OrderPayer::class.java) @@ -66,7 +65,7 @@ class OrderPayer( } paymentExecutor.submit { - counterInc.increment() + paymentProcessingStartedCounter.increment() try { val createdEvent = paymentESService.create { it.create(paymentId, orderId, amount) @@ -75,7 +74,7 @@ class OrderPayer( paymentService.submitPaymentRequest(paymentId, amount, createdAt, deadline) } finally { parallelLimiter.release() - counterDec.increment() + paymentProcessingCompletedCounter.increment() } } createdAt From 6aabbfd4aadd4ece6b5b36ce336a28df21716396 Mon Sep 17 00:00:00 2001 From: Ioann Boltonov Date: Fri, 10 Oct 2025 21:46:06 +0300 Subject: [PATCH 8/9] fix: unused imports --- .../kotlin/ru/quipy/orders/subscribers/PaymentSubscriber.kt | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/main/kotlin/ru/quipy/orders/subscribers/PaymentSubscriber.kt b/src/main/kotlin/ru/quipy/orders/subscribers/PaymentSubscriber.kt index 6c3b6784a..d5d35985d 100644 --- a/src/main/kotlin/ru/quipy/orders/subscribers/PaymentSubscriber.kt +++ b/src/main/kotlin/ru/quipy/orders/subscribers/PaymentSubscriber.kt @@ -1,7 +1,5 @@ package ru.quipy.orders.subscribers -import io.micrometer.core.instrument.Counter -import io.micrometer.core.instrument.MeterRegistry import io.micrometer.core.instrument.Metrics import jakarta.annotation.PostConstruct import org.slf4j.Logger From 2a456a1a672e2807f3907518b146a7e45875819b Mon Sep 17 00:00:00 2001 From: vaycheslav Date: Thu, 16 Oct 2025 19:40:55 +0300 Subject: [PATCH 9/9] fix: fixed dashboard description and added metric for queue monitoring --- .../dashboards/ServicesStatistic.json | 37 ++++++++++++------- .../ru/quipy/payments/logic/OrderPayer.kt | 2 + 2 files changed, 25 insertions(+), 14 deletions(-) diff --git a/grafana/provisioning/dashboards/ServicesStatistic.json b/grafana/provisioning/dashboards/ServicesStatistic.json index 79a28c798..74e9740f0 100644 --- a/grafana/provisioning/dashboards/ServicesStatistic.json +++ b/grafana/provisioning/dashboards/ServicesStatistic.json @@ -3484,7 +3484,7 @@ "type": "prometheus", "uid": "PBFA97CFB590B2093" }, - "description": "История изменения количества активных задач", + "description": "История изменения количества задач в очереди", "fieldConfig": { "defaults": { "color": { @@ -3494,7 +3494,7 @@ "axisBorderShow": false, "axisCenteredZero": false, "axisColorMode": "text", - "axisLabel": "Задачи в обработке", + "axisLabel": "Задачи в очереди", "axisPlacement": "auto", "barAlignment": 0, "drawStyle": "line", @@ -3567,12 +3567,12 @@ "type": "prometheus", "uid": "PBFA97CFB590B2093" }, - "expr": "payment_processing_started_total - payment_processing_completed_total", + "expr": "payment_processing_planned_total - payment_processing_started_total", "legendFormat": "{{accountName}}", "refId": "A" } ], - "title": "История активных задач", + "title": "История задач в очереди", "type": "timeseries" }, { @@ -3580,7 +3580,7 @@ "type": "prometheus", "uid": "PBFA97CFB590B2093" }, - "description": "Скорость входа и выхода задач из обработки (requests/sec)", + "description": "Скорость входа в ожидание и выхода из ожидания (requests/sec)", "fieldConfig": { "defaults": { "color": { @@ -3634,7 +3634,7 @@ { "matcher": { "id": "byName", - "options": "Скорость входа" + "options": "Скорость увеличения очереди" }, "properties": [ { @@ -3649,7 +3649,7 @@ { "matcher": { "id": "byName", - "options": "Скорость выхода" + "options": "Скорость числа задач в обработке" }, "properties": [ { @@ -3693,8 +3693,8 @@ "type": "prometheus", "uid": "PBFA97CFB590B2093" }, - "expr": "rate(payment_processing_started_total[1m])", - "legendFormat": "Вход: {{accountName}}", + "expr": "rate(payment_processing_planned_total[1m])", + "legendFormat": "Попало в очередь: {{accountName}}", "refId": "A" }, { @@ -3702,12 +3702,21 @@ "type": "prometheus", "uid": "PBFA97CFB590B2093" }, - "expr": "rate(payment_processing_completed_total[1m])", - "legendFormat": "Выход: {{accountName}}", + "expr": "rate(payment_processing_started_total[1m])", + "legendFormat": "Приняты в обработку (вышли из очереди): {{accountName}}", "refId": "B" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "expr": "rate(payment_processing_completed_total[1m])", + "legendFormat": "Обработаны: {{accountName}}", + "refId": "C" } ], - "title": "Throughput (вход vs выход)", + "title": "Throughput (в очереди vs в обработке vs обработаны)", "type": "timeseries" }, { @@ -3800,12 +3809,12 @@ "type": "prometheus", "uid": "PBFA97CFB590B2093" }, - "expr": "rate(payment_processing_started_total[1m]) - rate(payment_processing_completed_total[1m])", + "expr": "rate(payment_processing_planned_total[1m]) - rate(payment_processing_started_total[1m])", "legendFormat": "{{accountName}}", "refId": "A" } ], - "title": "Скорость накопления задач (критичный показатель)", + "title": "Скорость накопления задач (увеличения очереди)", "type": "timeseries" } ], diff --git a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt index 3df99a066..c30a5d160 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt @@ -28,6 +28,7 @@ class OrderPayer( private val parallelLimiter: Semaphore, ) { + private val paymentProcessingPlannedCounter: Counter = Metrics.counter("payment.processing.planned", "accountName", accountProperties.accountName) private val paymentProcessingStartedCounter: Counter = Metrics.counter("payment.processing.started", "accountName", accountProperties.accountName) private val paymentProcessingCompletedCounter: Counter = Metrics.counter("payment.processing.completed", "accountName", accountProperties.accountName) @@ -57,6 +58,7 @@ class OrderPayer( fun processPayment(orderId: UUID, amount: Int, paymentId: UUID, deadline: Long): Long { val createdAt = System.currentTimeMillis() + paymentProcessingPlannedCounter.increment() parallelLimiter.acquire() return try {