Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,19 @@ target/
!**/src/main/**/target/
!**/src/test/**/target/

### grafana ###
/grafana/

### prometheus ###

/prometheus/

### confidential data ###

http-client.env.json



### STS ###
.apt_generated
.classpath
Expand All @@ -18,7 +31,6 @@ target/
*.iws
*.iml
*.ipr
./http-client.env.json

### NetBeans ###
/nbproject/private/
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ RUN mvn dependency:go-offline
COPY src src
RUN mvn package

FROM openjdk:17-jdk-slim
FROM eclipse-temurin:17-alpine-3.22

COPY --from=build /app/target/*.jar /high-load-course.jar

Expand Down
6 changes: 6 additions & 0 deletions http-client.env.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"vars": {
"serviceName": "cas-m3404-05",
"token": "7baFVCuQJk1b8qC1yN"
}
}
20 changes: 20 additions & 0 deletions src/main/kotlin/ru/quipy/apigateway/APIController.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@ package ru.quipy.apigateway
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.web.bind.annotation.*
import ru.quipy.common.utils.TokenBucketRateLimiter
import ru.quipy.exceptions.DeadlineExceededException
import ru.quipy.exceptions.TooManyRequestsException
import ru.quipy.orders.repository.OrderRepository
import ru.quipy.payments.logic.OrderPayer
import java.util.*
import java.util.concurrent.TimeUnit

@RestController
class APIController(private val orderRepository: OrderRepository, private val orderPayer: OrderPayer) {
Expand Down Expand Up @@ -47,8 +51,24 @@ class APIController(private val orderRepository: OrderRepository, private val or
PAID,
}

private val tokenBucketRateLimiter: TokenBucketRateLimiter by lazy {
TokenBucketRateLimiter(
rate = 11,
bucketMaxCapacity = 140,
startBucket = 140,
window = 1000,
timeUnit = TimeUnit.MILLISECONDS,
)
}

@PostMapping("/orders/{orderId}/payment")
fun payOrder(@PathVariable orderId: UUID, @RequestParam deadline: Long): PaymentSubmissionDto {

if (!tokenBucketRateLimiter.tick()) {
throw TooManyRequestsException(retryAfterMillisecond = 30)
}

logger.info("Trying to pay order $orderId : $deadline")
val paymentId = UUID.randomUUID()
val order = orderRepository.findById(orderId)?.let {
orderRepository.save(it.copy(status = OrderStatus.PAYMENT_IN_PROGRESS))
Expand Down
26 changes: 20 additions & 6 deletions src/main/kotlin/ru/quipy/apigateway/GlobalExceptionHandler.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,35 @@ import org.springframework.http.HttpStatus
import org.springframework.http.ResponseEntity
import org.springframework.web.bind.annotation.ExceptionHandler
import org.springframework.web.bind.annotation.RestControllerAdvice
import ru.quipy.core.EventSourcingService
import ru.quipy.exceptions.DeadlineExceededException
import ru.quipy.exceptions.TooManyRequestsException
import ru.quipy.payments.api.PaymentAggregate
import ru.quipy.payments.logic.PaymentAggregateState
import ru.quipy.payments.logic.logProcessing
import ru.quipy.payments.logic.now
import java.util.UUID

@RestControllerAdvice
class GlobalExceptionHandler(
private val maxWait: String = "3",
) {
class GlobalExceptionHandler {
companion object {
val logger = LoggerFactory.getLogger(GlobalExceptionHandler::class.java)
}

@ExceptionHandler(TooManyRequestsException::class)
fun handleTooManyRequests(): ResponseEntity<String> {
fun handleTooManyRequests(ex: TooManyRequestsException): ResponseEntity<String> {
val wait = ex.retryAfterMillisecond
return ResponseEntity
.status(HttpStatus.TOO_MANY_REQUESTS)
.header("Retry-After", maxWait)
.header("Retry-After", wait.toString())
.build()
}
}

@ExceptionHandler(DeadlineExceededException::class)
fun handleUnprocessableEntity(): ResponseEntity<String> {

return ResponseEntity
.status(HttpStatus.OK)
.build()
}
}
17 changes: 17 additions & 0 deletions src/main/kotlin/ru/quipy/common/utils/SlidingWindowRateLimiter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,23 @@ class SlidingWindowRateLimiter(
}
}

fun tickBlocking(timeout: Duration): Boolean {
val deadline = System.currentTimeMillis() + timeout.toMillis()

while (System.currentTimeMillis() < deadline) {
if (tick()) {
return true
}

val remainingTime = deadline - System.currentTimeMillis()
if (remainingTime > 0) {
Thread.sleep(minOf(10, remainingTime))
}
}

return false
}

data class Measure(
val value: Long,
val timestamp: Long
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import java.util.concurrent.atomic.AtomicInteger
class TokenBucketRateLimiter(
private val rate: Int,
private val bucketMaxCapacity: Int,
private val startBucket: Int,
private val window: Long,
private val timeUnit: TimeUnit = TimeUnit.MINUTES,
) : RateLimiter {
Expand All @@ -22,7 +23,7 @@ class TokenBucketRateLimiter(

private val rateLimiterScope = CoroutineScope(Executors.newSingleThreadExecutor().asCoroutineDispatcher())

private var bucket: AtomicInteger = AtomicInteger(0)
private var bucket: AtomicInteger = AtomicInteger(startBucket)
private var start = System.currentTimeMillis()
private var nextExpectedWakeUp = start + timeUnit.toMillis(window)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package ru.quipy.exceptions

import java.util.UUID

class DeadlineExceededException() : RuntimeException()
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
package ru.quipy.exceptions

class TooManyRequestsException() : RuntimeException()
class TooManyRequestsException(val retryAfterMillisecond: Long) : RuntimeException()
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import java.net.http.HttpClient
import java.net.http.HttpRequest
import java.net.http.HttpResponse
import java.util.*
import java.util.concurrent.Semaphore


@Configuration
Expand Down Expand Up @@ -60,7 +61,8 @@ class PaymentAccountsConfig {
it,
paymentService,
paymentProviderHostPort,
token
token,
Semaphore(it.parallelRequests)
)
}
}
Expand Down
8 changes: 7 additions & 1 deletion src/main/kotlin/ru/quipy/payments/dto/Transaction.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ package ru.quipy.payments.dto
import kotlinx.coroutines.Runnable
import java.util.UUID

data class Transaction(val orderId: UUID, val amount: Int, val paymentId: UUID, val deadline: Long, val task : Runnable) : Runnable {
data class Transaction(
val orderId: UUID,
val amount: Int,
val paymentId: UUID,
val deadline: Long,
val task: Runnable
) : Runnable {
override fun run() = task.run()
}
50 changes: 13 additions & 37 deletions src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import org.springframework.stereotype.Service
import ru.quipy.common.utils.NamedThreadFactory
import ru.quipy.common.utils.SlidingWindowRateLimiter
import ru.quipy.core.EventSourcingService
import ru.quipy.exceptions.DeadlineExceededException
import ru.quipy.exceptions.TooManyRequestsException
import ru.quipy.payments.api.PaymentAggregate
import ru.quipy.payments.dto.Transaction
Expand Down Expand Up @@ -37,57 +38,32 @@ class OrderPayer(

private val paymentExecutor: ThreadPoolExecutor by lazy {
ThreadPoolExecutor(
accountProperties.parallelRequests,
accountProperties.parallelRequests,
16,
16,
0L,
TimeUnit.MILLISECONDS,
ArrayBlockingQueue<Runnable>(accountProperties.parallelRequests),
ArrayBlockingQueue<Runnable>(8_000),
NamedThreadFactory("payment-submission-executor"),
ThreadPoolExecutor.AbortPolicy()
)
}

private val rateLimit: SlidingWindowRateLimiter by lazy {
SlidingWindowRateLimiter(
rate = accountProperties.rateLimitPerSec.toLong(),
window = Duration.ofSeconds(1),
)
}

fun processPayment(orderId: UUID, amount: Int, paymentId: UUID, deadline: Long): Long {
val createdAt = System.currentTimeMillis()
paymentProcessingPlannedCounter.increment()

val task = Runnable {
parallelLimiter.acquire()
while (!rateLimit.tick()) {
Thread.sleep(Random().nextInt(0, 10).toLong())
}
paymentProcessingStartedCounter.increment()
try {
val createdEvent = paymentESService.create {
it.create(paymentId, orderId, amount)
}
logger.trace("Payment {} for order {} created.", createdEvent.paymentId, orderId)
paymentService.submitPaymentRequest(
paymentExecutor.submit {
val createdEvent = paymentESService.create {
it.create(
paymentId,
amount,
createdAt,
deadline
orderId,
amount
)
} finally {
parallelLimiter.release()
paymentProcessingCompletedCounter.increment()
}
}

val transaction = Transaction(orderId, amount, paymentId, deadline, task)
logger.trace("Payment {} for order {} created.", createdEvent.paymentId, orderId)

try {
paymentExecutor.execute(transaction)
return createdAt
} catch (_: RejectedExecutionException) {
throw TooManyRequestsException()
paymentService.submitPaymentRequest(paymentId, amount, createdAt, deadline)
}

return createdAt
}
}
Loading