Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/main/kotlin/ru/quipy/apigateway/APIController.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ import java.util.*
class APIController(
private val orderRepository: OrderRepository,
private val orderPayer: OrderPayer,
@field:Qualifier("parallelLimiter")
private val rateLimiter: RateLimiter = LeakingBucketRateLimiter(8, Duration.ofSeconds(1), 30)
private val rateLimiter: LeakingBucketRateLimiter
) {

val logger: Logger = LoggerFactory.getLogger(APIController::class.java)
Expand Down
89 changes: 79 additions & 10 deletions src/main/kotlin/ru/quipy/payments/config/PaymentAccountsConfig.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ import io.micrometer.core.instrument.MeterRegistry
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import ru.quipy.common.utils.CallerBlockingRejectedExecutionHandler
import ru.quipy.common.utils.LeakingBucketRateLimiter
import ru.quipy.common.utils.NamedThreadFactory
import ru.quipy.common.utils.SlidingWindowRateLimiter
import ru.quipy.core.EventSourcingService
import ru.quipy.payments.api.PaymentAggregate
import ru.quipy.payments.logic.PaymentAccountProperties
Expand All @@ -17,8 +21,12 @@ import java.net.URI
import java.net.http.HttpClient
import java.net.http.HttpRequest
import java.net.http.HttpResponse
import java.time.Duration
import java.util.*
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.Semaphore
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit


@Configuration
Expand All @@ -28,6 +36,8 @@ class PaymentAccountsConfig {
private val mapper = ObjectMapper().registerKotlinModule().registerModules(JavaTimeModule())
}

var rateCheckWindow: Duration = Duration.ofMillis(1000)

@Value("\${payment.hostPort}")
lateinit var paymentProviderHostPort: String

Expand All @@ -41,23 +51,82 @@ class PaymentAccountsConfig {
lateinit var allowedAccounts: List<String>

@Bean
fun accountAdapters(
paymentService: EventSourcingService<UUID, PaymentAggregate, PaymentAggregateState>,
meterRegistry: MeterRegistry,
): List<PaymentExternalSystemAdapter> {
fun warehouseIfUnfinishedWork(
accountProperties: List<PaymentAccountProperties>,
@Value("\${payment.maximumPoolSize}")
maximumPoolSize: Int
): ThreadPoolExecutor {
val poolSize = 100
val temp = ThreadPoolExecutor(
poolSize,
poolSize,
0,
TimeUnit.MILLISECONDS,
LinkedBlockingQueue(maximumPoolSize),
NamedThreadFactory("payment-submission-executor"),
CallerBlockingRejectedExecutionHandler()
)
return temp
}

@Bean
fun parallelLimiter(
accountProperties: List<PaymentAccountProperties>
): Semaphore =
Semaphore(accountProperties.minOf { it.parallelRequests })

@Bean
fun burstRateLimiter(
accountProperties: List<PaymentAccountProperties>,
@Value("#{'\${payment.processingTimeMillis}'.split(',')}")
processingTimeMillis: Int
): LeakingBucketRateLimiter =
LeakingBucketRateLimiter(
rate = accountProperties.minOf { it.rateLimitPerSec }.toLong(),
window = rateCheckWindow,
bucketSize = (((processingTimeMillis - accountProperties.maxOf { it.averageProcessingTime }
.toMillis()) / accountProperties.maxOf { it.averageProcessingTime }
.toMillis()) * accountProperties.minOf { it.rateLimitPerSec }.toLong()).toInt()
)

@Bean
fun smoothOutIncoming(
accountProperties: List<PaymentAccountProperties>,
): SlidingWindowRateLimiter =
SlidingWindowRateLimiter(
rate = accountProperties.minOf { it.rateLimitPerSec }.toLong(),
window = rateCheckWindow,
)


@Bean
fun accountProperties(): List<PaymentAccountProperties> {
val request = HttpRequest.newBuilder()
.uri(URI("http://${paymentProviderHostPort}/external/accounts?serviceName=$serviceName&token=$token"))
.timeout(Duration.ofSeconds(10))
.GET()
.build()

val resp = javaClient.send(request, HttpResponse.BodyHandlers.ofString())

println("\nPayment accounts list:")
return mapper.readValue<List<PaymentAccountProperties>>(
val accounts: List<PaymentAccountProperties> = mapper.readValue<List<PaymentAccountProperties>>(
resp.body(),
mapper.typeFactory.constructCollectionType(List::class.java, PaymentAccountProperties::class.java)
)
.filter { it.accountName in allowedAccounts }
mapper.typeFactory.constructCollectionType(
List::class.java,
PaymentAccountProperties::class.java
)
).filter { it.accountName in allowedAccounts }

return accounts
}

@Bean
fun accountAdapters(
paymentService: EventSourcingService<UUID, PaymentAggregate, PaymentAggregateState>,
meterRegistry: MeterRegistry,
accountProperties: List<PaymentAccountProperties>
): List<PaymentExternalSystemAdapter> {
return accountProperties
.map { it.copy(enabled = true) }
.onEach(::println)
.map {
Expand All @@ -67,7 +136,7 @@ class PaymentAccountsConfig {
paymentProviderHostPort,
token,
meterRegistry,
Semaphore(it.parallelRequests)
parallelLimiter(accountProperties)
)
}
}
Expand Down
25 changes: 7 additions & 18 deletions src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ru.quipy.payments.logic
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.stereotype.Service
import ru.quipy.common.utils.CallerBlockingRejectedExecutionHandler
import ru.quipy.common.utils.NamedThreadFactory
Expand All @@ -18,37 +19,25 @@ import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit

@Service
class OrderPayer {
class OrderPayer(
val rateLimiter: SlidingWindowRateLimiter,
@Qualifier("warehouseIfUnfinishedWork")
val paymentExecutor: ThreadPoolExecutor
) {

companion object {
val logger: Logger = LoggerFactory.getLogger(OrderPayer::class.java)
}
private val rateLimit: SlidingWindowRateLimiter by lazy {
SlidingWindowRateLimiter(
rate = 8,
window = Duration.ofMillis(1000),
)
}

@Autowired
private lateinit var paymentESService: EventSourcingService<UUID, PaymentAggregate, PaymentAggregateState>

@Autowired
private lateinit var paymentService: PaymentService

private val paymentExecutor = ThreadPoolExecutor(
16,
16,
0,
TimeUnit.MILLISECONDS,
LinkedBlockingQueue(8_000),
NamedThreadFactory("payment-submission-executor"),
CallerBlockingRejectedExecutionHandler()
)

fun processPayment(orderId: UUID, amount: Int, paymentId: UUID, deadline: Long): Long {
val createdAt = System.currentTimeMillis()
if (!rateLimit.tickBlocking(deadline- System.currentTimeMillis())) {
if (!rateLimiter.tickBlocking(deadline- System.currentTimeMillis())) {
throw TooManyRequestsException(deadline)
}
paymentExecutor.submit {
Expand Down
7 changes: 6 additions & 1 deletion src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ payment.service-name=${PAYMENT_SERVICE_NAME}
payment.token=${PAYMENT_TOKEN}
# payment.accounts=${PAYMENT_ACCOUNTS:acc-12,acc-20}
# payment.accounts=${PAYMENT_ACCOUNTS:acc-3}
payment.accounts=${PAYMENT_ACCOUNTS:acc-7}
#payment.accounts=${PAYMENT_ACCOUNTS:acc-7}
# payment.accounts=${PAYMENT_ACCOUNTS:acc-18}
payment.hostPort=${PAYMENT_HOST:localhost}:${PAYMENT_PORT:1234}

# For 8 case
payment.accounts=${PAYMENT_ACCOUNTS:acc-9}
payment.processingTimeMillis=20000
payment.maximumPoolSize=8000
36 changes: 31 additions & 5 deletions test-local-run.http
Original file line number Diff line number Diff line change
@@ -1,15 +1,41 @@
### Run test locally
### Test 6 run...
POST http://localhost:1234/test/run
Content-Type: application/json

{
"serviceName": "{{serviceName}}",
"token": "{{token}}",
"ratePerSecond": 1,
"testCount": 100,
"processingTimeMillis": 80000
"ratePerSecond": 100,
"testCount": 5000,
"processingTimeMillis": 20000
}

### Test 7 run...
POST http://localhost:1234/test/run
Content-Type: application/json

{
"serviceName": "{{serviceName}}",
"token": "{{token}}",
"ratePerSecond": 100,
"testCount": 5000,
"processingTimeMillis": 20000
}

### Test 8 run...
POST http://localhost:1234/test/run
Content-Type: application/json

{
"serviceName": "{{serviceName}}",
"token": "{{token}}",
"ratePerSecond": 100,
"testCount": 5000,
"processingTimeMillis": 20000
}



### Stop running test to save time and resources
# @timeout 120
POST http://localhost:4321/test/stop/"{{serviceName}}"
POST http://localhost:4321/test/stop/"{{serviceName}}"