diff --git a/.gitignore b/.gitignore index db5578ab1..cf4961ef7 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,19 @@ target/ !**/src/main/**/target/ !**/src/test/**/target/ +### grafana ### +/grafana/ + +### prometheus ### + +/prometheus/ + +### confidential data ### + +http-client.env.json + + + ### STS ### .apt_generated .classpath @@ -18,7 +31,6 @@ target/ *.iws *.iml *.ipr -./http-client.env.json ### NetBeans ### /nbproject/private/ diff --git a/Dockerfile b/Dockerfile index cc6f2e042..3d9587cd0 100644 --- a/Dockerfile +++ b/Dockerfile @@ -6,7 +6,7 @@ RUN mvn dependency:go-offline COPY src src RUN mvn package -FROM openjdk:17-jdk-slim +FROM eclipse-temurin:17-alpine-3.22 COPY --from=build /app/target/*.jar /high-load-course.jar diff --git a/http-client.env.json b/http-client.env.json new file mode 100644 index 000000000..7373b4a7e --- /dev/null +++ b/http-client.env.json @@ -0,0 +1,6 @@ +{ + "vars": { + "serviceName": "cas-m3404-05", + "token": "7baFVCuQJk1b8qC1yN" + } +} \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/apigateway/APIController.kt b/src/main/kotlin/ru/quipy/apigateway/APIController.kt index 251b8cdbc..683cbbff9 100644 --- a/src/main/kotlin/ru/quipy/apigateway/APIController.kt +++ b/src/main/kotlin/ru/quipy/apigateway/APIController.kt @@ -3,9 +3,13 @@ package ru.quipy.apigateway import org.slf4j.Logger import org.slf4j.LoggerFactory import org.springframework.web.bind.annotation.* +import ru.quipy.common.utils.TokenBucketRateLimiter +import ru.quipy.exceptions.DeadlineExceededException +import ru.quipy.exceptions.TooManyRequestsException import ru.quipy.orders.repository.OrderRepository import ru.quipy.payments.logic.OrderPayer import java.util.* +import java.util.concurrent.TimeUnit @RestController class APIController(private val orderRepository: OrderRepository, private val orderPayer: OrderPayer) { @@ -47,8 +51,24 @@ class APIController(private val orderRepository: OrderRepository, private val or PAID, } + private val tokenBucketRateLimiter: TokenBucketRateLimiter by lazy { + TokenBucketRateLimiter( + rate = 11, + bucketMaxCapacity = 140, + startBucket = 140, + window = 1000, + timeUnit = TimeUnit.MILLISECONDS, + ) + } + @PostMapping("/orders/{orderId}/payment") fun payOrder(@PathVariable orderId: UUID, @RequestParam deadline: Long): PaymentSubmissionDto { + + if (!tokenBucketRateLimiter.tick()) { + throw TooManyRequestsException(retryAfterMillisecond = 30) + } + + logger.info("Trying to pay order $orderId : $deadline") val paymentId = UUID.randomUUID() val order = orderRepository.findById(orderId)?.let { orderRepository.save(it.copy(status = OrderStatus.PAYMENT_IN_PROGRESS)) diff --git a/src/main/kotlin/ru/quipy/apigateway/GlobalExceptionHandler.kt b/src/main/kotlin/ru/quipy/apigateway/GlobalExceptionHandler.kt index e32911ade..2231c36e4 100644 --- a/src/main/kotlin/ru/quipy/apigateway/GlobalExceptionHandler.kt +++ b/src/main/kotlin/ru/quipy/apigateway/GlobalExceptionHandler.kt @@ -5,21 +5,35 @@ import org.springframework.http.HttpStatus import org.springframework.http.ResponseEntity import org.springframework.web.bind.annotation.ExceptionHandler import org.springframework.web.bind.annotation.RestControllerAdvice +import ru.quipy.core.EventSourcingService +import ru.quipy.exceptions.DeadlineExceededException import ru.quipy.exceptions.TooManyRequestsException +import ru.quipy.payments.api.PaymentAggregate +import ru.quipy.payments.logic.PaymentAggregateState +import ru.quipy.payments.logic.logProcessing +import ru.quipy.payments.logic.now +import java.util.UUID @RestControllerAdvice -class GlobalExceptionHandler( - private val maxWait: String = "3", -) { +class GlobalExceptionHandler { companion object { val logger = LoggerFactory.getLogger(GlobalExceptionHandler::class.java) } @ExceptionHandler(TooManyRequestsException::class) - fun handleTooManyRequests(): ResponseEntity { + fun handleTooManyRequests(ex: TooManyRequestsException): ResponseEntity { + val wait = ex.retryAfterMillisecond return ResponseEntity .status(HttpStatus.TOO_MANY_REQUESTS) - .header("Retry-After", maxWait) + .header("Retry-After", wait.toString()) .build() } -} \ No newline at end of file + + @ExceptionHandler(DeadlineExceededException::class) + fun handleUnprocessableEntity(): ResponseEntity { + + return ResponseEntity + .status(HttpStatus.OK) + .build() + } +} diff --git a/src/main/kotlin/ru/quipy/common/utils/SlidingWindowRateLimiter.kt b/src/main/kotlin/ru/quipy/common/utils/SlidingWindowRateLimiter.kt index 1d86848e2..04c6abc79 100644 --- a/src/main/kotlin/ru/quipy/common/utils/SlidingWindowRateLimiter.kt +++ b/src/main/kotlin/ru/quipy/common/utils/SlidingWindowRateLimiter.kt @@ -37,6 +37,23 @@ class SlidingWindowRateLimiter( } } + fun tickBlocking(timeout: Duration): Boolean { + val deadline = System.currentTimeMillis() + timeout.toMillis() + + while (System.currentTimeMillis() < deadline) { + if (tick()) { + return true + } + + val remainingTime = deadline - System.currentTimeMillis() + if (remainingTime > 0) { + Thread.sleep(minOf(10, remainingTime)) + } + } + + return false + } + data class Measure( val value: Long, val timestamp: Long diff --git a/src/main/kotlin/ru/quipy/common/utils/TokenBucketRateLimiter.kt b/src/main/kotlin/ru/quipy/common/utils/TokenBucketRateLimiter.kt index a0791607f..2df8ec5a5 100644 --- a/src/main/kotlin/ru/quipy/common/utils/TokenBucketRateLimiter.kt +++ b/src/main/kotlin/ru/quipy/common/utils/TokenBucketRateLimiter.kt @@ -13,6 +13,7 @@ import java.util.concurrent.atomic.AtomicInteger class TokenBucketRateLimiter( private val rate: Int, private val bucketMaxCapacity: Int, + private val startBucket: Int, private val window: Long, private val timeUnit: TimeUnit = TimeUnit.MINUTES, ) : RateLimiter { @@ -22,7 +23,7 @@ class TokenBucketRateLimiter( private val rateLimiterScope = CoroutineScope(Executors.newSingleThreadExecutor().asCoroutineDispatcher()) - private var bucket: AtomicInteger = AtomicInteger(0) + private var bucket: AtomicInteger = AtomicInteger(startBucket) private var start = System.currentTimeMillis() private var nextExpectedWakeUp = start + timeUnit.toMillis(window) diff --git a/src/main/kotlin/ru/quipy/exceptions/DeadlineExceededException.kt b/src/main/kotlin/ru/quipy/exceptions/DeadlineExceededException.kt new file mode 100644 index 000000000..8bd50330d --- /dev/null +++ b/src/main/kotlin/ru/quipy/exceptions/DeadlineExceededException.kt @@ -0,0 +1,5 @@ +package ru.quipy.exceptions + +import java.util.UUID + +class DeadlineExceededException() : RuntimeException() \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/exceptions/TooManyRequestsException.kt b/src/main/kotlin/ru/quipy/exceptions/TooManyRequestsException.kt index 8d887d9f7..6d1756c92 100644 --- a/src/main/kotlin/ru/quipy/exceptions/TooManyRequestsException.kt +++ b/src/main/kotlin/ru/quipy/exceptions/TooManyRequestsException.kt @@ -1,3 +1,3 @@ package ru.quipy.exceptions -class TooManyRequestsException() : RuntimeException() \ No newline at end of file +class TooManyRequestsException(val retryAfterMillisecond: Long) : RuntimeException() \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/payments/config/PaymentAccountsConfig.kt b/src/main/kotlin/ru/quipy/payments/config/PaymentAccountsConfig.kt index 2842a405f..4d3eda6da 100644 --- a/src/main/kotlin/ru/quipy/payments/config/PaymentAccountsConfig.kt +++ b/src/main/kotlin/ru/quipy/payments/config/PaymentAccountsConfig.kt @@ -17,6 +17,7 @@ import java.net.http.HttpClient import java.net.http.HttpRequest import java.net.http.HttpResponse import java.util.* +import java.util.concurrent.Semaphore @Configuration @@ -60,7 +61,8 @@ class PaymentAccountsConfig { it, paymentService, paymentProviderHostPort, - token + token, + Semaphore(it.parallelRequests) ) } } diff --git a/src/main/kotlin/ru/quipy/payments/dto/Transaction.kt b/src/main/kotlin/ru/quipy/payments/dto/Transaction.kt index ce491998a..799949987 100644 --- a/src/main/kotlin/ru/quipy/payments/dto/Transaction.kt +++ b/src/main/kotlin/ru/quipy/payments/dto/Transaction.kt @@ -3,6 +3,12 @@ package ru.quipy.payments.dto import kotlinx.coroutines.Runnable import java.util.UUID -data class Transaction(val orderId: UUID, val amount: Int, val paymentId: UUID, val deadline: Long, val task : Runnable) : Runnable { +data class Transaction( + val orderId: UUID, + val amount: Int, + val paymentId: UUID, + val deadline: Long, + val task: Runnable +) : Runnable { override fun run() = task.run() } \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt index 2e8320429..f01abfbb3 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt @@ -9,6 +9,7 @@ import org.springframework.stereotype.Service import ru.quipy.common.utils.NamedThreadFactory import ru.quipy.common.utils.SlidingWindowRateLimiter import ru.quipy.core.EventSourcingService +import ru.quipy.exceptions.DeadlineExceededException import ru.quipy.exceptions.TooManyRequestsException import ru.quipy.payments.api.PaymentAggregate import ru.quipy.payments.dto.Transaction @@ -37,57 +38,32 @@ class OrderPayer( private val paymentExecutor: ThreadPoolExecutor by lazy { ThreadPoolExecutor( - accountProperties.parallelRequests, - accountProperties.parallelRequests, + 16, + 16, 0L, TimeUnit.MILLISECONDS, - ArrayBlockingQueue(accountProperties.parallelRequests), + ArrayBlockingQueue(8_000), NamedThreadFactory("payment-submission-executor"), ThreadPoolExecutor.AbortPolicy() ) } - private val rateLimit: SlidingWindowRateLimiter by lazy { - SlidingWindowRateLimiter( - rate = accountProperties.rateLimitPerSec.toLong(), - window = Duration.ofSeconds(1), - ) - } - fun processPayment(orderId: UUID, amount: Int, paymentId: UUID, deadline: Long): Long { val createdAt = System.currentTimeMillis() - paymentProcessingPlannedCounter.increment() - - val task = Runnable { - parallelLimiter.acquire() - while (!rateLimit.tick()) { - Thread.sleep(Random().nextInt(0, 10).toLong()) - } - paymentProcessingStartedCounter.increment() - try { - val createdEvent = paymentESService.create { - it.create(paymentId, orderId, amount) - } - logger.trace("Payment {} for order {} created.", createdEvent.paymentId, orderId) - paymentService.submitPaymentRequest( + paymentExecutor.submit { + val createdEvent = paymentESService.create { + it.create( paymentId, - amount, - createdAt, - deadline + orderId, + amount ) - } finally { - parallelLimiter.release() - paymentProcessingCompletedCounter.increment() } - } - val transaction = Transaction(orderId, amount, paymentId, deadline, task) + logger.trace("Payment {} for order {} created.", createdEvent.paymentId, orderId) - try { - paymentExecutor.execute(transaction) - return createdAt - } catch (_: RejectedExecutionException) { - throw TooManyRequestsException() + paymentService.submitPaymentRequest(paymentId, amount, createdAt, deadline) } + + return createdAt } } \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt index c0f4f84f5..5b3ee0211 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt @@ -6,19 +6,23 @@ import okhttp3.OkHttpClient import okhttp3.Request import okhttp3.RequestBody import org.slf4j.LoggerFactory +import org.springframework.beans.factory.annotation.Qualifier +import ru.quipy.common.utils.SlidingWindowRateLimiter import ru.quipy.core.EventSourcingService import ru.quipy.payments.api.PaymentAggregate import java.net.SocketTimeoutException import java.time.Duration import java.util.* +import java.util.concurrent.Semaphore +import java.util.concurrent.TimeUnit - -// Advice: always treat time as a Duration class PaymentExternalSystemAdapterImpl( private val properties: PaymentAccountProperties, private val paymentESService: EventSourcingService, private val paymentProviderHostPort: String, private val token: String, + @field:Qualifier("parallelLimiter") + private val parallelLimiter: Semaphore, ) : PaymentExternalSystemAdapter { companion object { @@ -34,6 +38,11 @@ class PaymentExternalSystemAdapterImpl( private val rateLimitPerSec = properties.rateLimitPerSec private val parallelRequests = properties.parallelRequests + private val slidingWindowRateLimiter = SlidingWindowRateLimiter( + rate = rateLimitPerSec.toLong(), + window = Duration.ofMillis(1_000) + ) + private val client = OkHttpClient.Builder().build() override fun getAccountProperties(): PaymentAccountProperties { @@ -54,17 +63,56 @@ class PaymentExternalSystemAdapterImpl( logger.info("[$accountName] Submit: $paymentId , txId: $transactionId") try { + val parallelLimiterTimeout = calculateRemainingTime(deadline, requestAverageProcessingTime.toMillis()) + if (parallelLimiterTimeout <= 0 || + !parallelLimiter.tryAcquire(parallelLimiterTimeout, TimeUnit.MILLISECONDS)) { + + logger.warn("[$accountName] Parallel limiter timeout for payment $paymentId") + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId) + } + return + } + + val rateLimiterTimeout = calculateRemainingTime(deadline, requestAverageProcessingTime.toMillis()) + if (rateLimiterTimeout <= 0 || !slidingWindowRateLimiter.tickBlocking(Duration.ofMillis(rateLimiterTimeout))) { + + logger.warn("[$accountName] Rate limiter timeout for payment $paymentId") + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId) + } + return + } + val request = Request.Builder().run { url("http://$paymentProviderHostPort/external/process?serviceName=$serviceName&token=$token&accountName=$accountName&transactionId=$transactionId&paymentId=$paymentId&amount=$amount") post(emptyBody) - }.build() + build() + } + + val httpTimeout = calculateRemainingTime(deadline, requestAverageProcessingTime.toMillis()) + if (httpTimeout <= 0) { + logger.warn("[$accountName] HTTP request timeout before execution for payment $paymentId") + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, reason = "Request timeout before execution") + } + return + } + + val httpClient = OkHttpClient.Builder() + .connectTimeout(httpTimeout, TimeUnit.MILLISECONDS) + .readTimeout(httpTimeout, TimeUnit.MILLISECONDS) + .writeTimeout(httpTimeout, TimeUnit.MILLISECONDS) + .build() - client.newCall(request).execute().use { response -> + httpClient.newCall(request).execute().use { response -> val body = try { - mapper.readValue(response.body?.string(), ExternalSysResponse::class.java) + response.body?.string()?.let { + mapper.readValue(it, ExternalSysResponse::class.java) + } ?: ExternalSysResponse(transactionId.toString(), paymentId.toString(), false, "Empty response body") } catch (e: Exception) { logger.error("[$accountName] [ERROR] Payment processed for txId: $transactionId, payment: $paymentId, result code: ${response.code}, reason: ${response.body?.string()}") - ExternalSysResponse(transactionId.toString(), paymentId.toString(), false, e.message) + ExternalSysResponse(transactionId.toString(), paymentId.toString(), false, e.message ?: "Unknown error") } logger.warn("[$accountName] Payment processed for txId: $transactionId, payment: $paymentId, succeeded: ${body.result}, message: ${body.message}") @@ -80,18 +128,20 @@ class PaymentExternalSystemAdapterImpl( is SocketTimeoutException -> { logger.error("[$accountName] Payment timeout for txId: $transactionId, payment: $paymentId", e) paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, reason = "Request timeout.") + it.logProcessing(false, now(), transactionId, reason = "HTTP request timeout") } } - else -> { logger.error("[$accountName] Payment failed for txId: $transactionId, payment: $paymentId", e) - paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, reason = e.message) + it.logProcessing(false, now(), transactionId, reason = e.message ?: "Unknown error") } } } + } finally { + if (parallelLimiter.availablePermits() < parallelRequests) { + parallelLimiter.release() + } } } @@ -101,6 +151,10 @@ class PaymentExternalSystemAdapterImpl( override fun name() = properties.accountName + private fun calculateRemainingTime(deadline: Long, requestAverageProcessingTime: Long): Long { + return deadline - now() - (requestAverageProcessingTime * 0.01).toLong() + } + } public fun now() = System.currentTimeMillis() \ No newline at end of file diff --git a/test-local-run.http b/test-local-run.http index 743f0dabd..fd91a5b1e 100644 --- a/test-local-run.http +++ b/test-local-run.http @@ -5,31 +5,42 @@ Content-Type: application/json { "serviceName": "{{serviceName}}", "token": "{{token}}", - "ratePerSecond": 16, - "testCount": 16000, - "processingTimeMillis": 30000 + "ratePerSecond": 11, + "testCount": 2200, + "processingTimeMillis": 13000, + "profile": "s_0.7_60" } ### Stop running test to save time and resources # @timeout 120 POST http://localhost:1234/test/stop/{{serviceName}} -# PaymentAccountProperties( -# serviceName=cas-m3404-05, -# accountName=acc-3, -# parallelRequests=30, -# rateLimitPerSec=10, -# price=30, -# averageProcessingTime=PT1S, -# enabled=true -# ) -# PaymentAccountProperties( -# serviceName=cas-m3404-05, -# accountName=acc-5, -# parallelRequests=5, -# rateLimitPerSec=3, -# price=30, -# averageProcessingTime=PT4.9S, -# enabled=true -#) +#Test #1 +#{ +# "serviceName": "{{serviceName}}", +# "token": "{{token}}", +# "ratePerSecond": 16, +# "testCount": 3000, +# "processingTimeMillis": 2500 +#} + +#Test #2 +#{ +# "serviceName": "{{serviceName}}", +# "token": "{{token}}", +# "ratePerSecond": 11, +# "testCount": 2200, +# "processingTimeMillis": 13000, +# "profile": "s_0.7_60" +#} + +#Test #3 +#{ +# "serviceName": "{{serviceName}}", +# "token": "{{token}}", +# "ratePerSecond": 3, +# "testCount": 1050, +# "processingTimeMillis": 26000, +# "runits": 90 +#} \ No newline at end of file diff --git a/test-on-prem-run.http b/test-on-prem-run.http index 584edc0b5..19f2bb9bb 100644 --- a/test-on-prem-run.http +++ b/test-on-prem-run.http @@ -6,11 +6,11 @@ Content-Type: application/json { "serviceName": "{{serviceName}}", "token": "{{token}}", - "branch": "main", - "accounts": "acc-3", + "branch": "feature/kuro-lab-5-2", + "accounts": "acc-23", "ratePerSecond": 11, - "testCount": 1200, - "processingTimeMillis": 80000, + "testCount": 2200, + "processingTimeMillis": 13000, "onPremises": true }