Skip to content
Open

hw 9 #20

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/main/kotlin/ru/quipy/OnlineShopApplication.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package ru.quipy

import kotlinx.coroutines.asCoroutineDispatcher
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.boot.autoconfigure.SpringBootApplication
Expand All @@ -13,10 +14,10 @@ class OnlineShopApplication {
val log: Logger = LoggerFactory.getLogger(OnlineShopApplication::class.java)

companion object {
val appExecutor = Executors.newFixedThreadPool(64, NamedThreadFactory("main-app-executor"))
val appExecutor = Executors.newFixedThreadPool(20_000, NamedThreadFactory("main-app-executor")).asCoroutineDispatcher()
}
}

fun main(args: Array<String>) {
suspend fun main(args: Array<String>) {
runApplication<OnlineShopApplication>(*args)
}
8 changes: 4 additions & 4 deletions src/main/kotlin/ru/quipy/apigateway/APIController.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ class APIController(
private val orderRepository: OrderRepository,
private val orderPayer: OrderPayer,
@field:Qualifier("parallelLimiter")
private val rateLimiter: RateLimiter = LeakingBucketRateLimiter(8, Duration.ofSeconds(1), 30)
private val rateLimiter: RateLimiter = LeakingBucketRateLimiter(1100, Duration.ofSeconds(1), 3300)
) {

val logger: Logger = LoggerFactory.getLogger(APIController::class.java)

@PostMapping("/users")
fun createUser(@RequestBody req: CreateUserRequest): User {
suspend fun createUser(@RequestBody req: CreateUserRequest): User {
return User(UUID.randomUUID(), req.name)
}

Expand All @@ -32,7 +32,7 @@ class APIController(
data class User(val id: UUID, val name: String)

@PostMapping("/orders")
fun createOrder(@RequestParam userId: UUID, @RequestParam price: Int): Order {
suspend fun createOrder(@RequestParam userId: UUID, @RequestParam price: Int): Order {
val order = Order(
UUID.randomUUID(),
userId,
Expand All @@ -58,7 +58,7 @@ class APIController(
}

@PostMapping("/orders/{orderId}/payment")
fun payOrder(@PathVariable orderId: UUID, @RequestParam deadline: Long): PaymentSubmissionDto {
suspend fun payOrder(@PathVariable orderId: UUID, @RequestParam deadline: Long): PaymentSubmissionDto {
if (!rateLimiter.tick()) {
throw TooManyRequestsException(deadline)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ class SlidingWindowRateLimiter(
}
}

fun tickBlocking(timeout: Long): Boolean {
suspend fun tickBlocking(timeout: Long): Boolean {
val timeStarted = System.currentTimeMillis()
while (System.currentTimeMillis()-timeStarted < timeout && !tick()) {
Thread.sleep(2)
delay(2)
}
return System.currentTimeMillis()-timeStarted < timeout
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class PaymentSubscriber {
retryConf = RetryConf(1, RetryFailedStrategy.SKIP_EVENT)
) {
`when`(PaymentProcessedEvent::class) { event ->
appExecutor.submit {
appExecutor.run {
logger.trace(
"Payment results. OrderId ${event.orderId}, succeeded: ${event.success}, txId: ${event.transactionId}, reason: ${event.reason}, duration: ${
Duration.ofMillis(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
import com.fasterxml.jackson.module.kotlin.registerKotlinModule
import io.micrometer.core.instrument.MeterRegistry
import kotlinx.coroutines.sync.Semaphore
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
Expand All @@ -18,7 +19,6 @@ import java.net.http.HttpClient
import java.net.http.HttpRequest
import java.net.http.HttpResponse
import java.util.*
import java.util.concurrent.Semaphore


@Configuration
Expand Down
46 changes: 12 additions & 34 deletions src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt
Original file line number Diff line number Diff line change
@@ -1,68 +1,46 @@
package ru.quipy.payments.logic

import io.micrometer.core.instrument.MeterRegistry
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import ru.quipy.common.utils.CallerBlockingRejectedExecutionHandler
import ru.quipy.common.utils.NamedThreadFactory
import ru.quipy.common.utils.SlidingWindowRateLimiter
import ru.quipy.core.EventSourcingService
import ru.quipy.exceptions.RateLimitWasBreached
import ru.quipy.exceptions.TooManyRequestsException
import ru.quipy.payments.api.PaymentAggregate
import java.time.Duration
import java.util.*
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit

@Service
class OrderPayer {
class OrderPayer(meterRegistry: MeterRegistry) {

companion object {
val logger: Logger = LoggerFactory.getLogger(OrderPayer::class.java)
}
private val rateLimit: SlidingWindowRateLimiter by lazy {
SlidingWindowRateLimiter(
rate = 8,
window = Duration.ofMillis(1000),
)
}

private val plannedRequests = meterRegistry.counter("payment.processing.planned", "accountName", "acc-12")


@Autowired
private lateinit var paymentESService: EventSourcingService<UUID, PaymentAggregate, PaymentAggregateState>

@Autowired
private lateinit var paymentService: PaymentService

private val paymentExecutor = ThreadPoolExecutor(
16,
16,
0,
TimeUnit.MILLISECONDS,
LinkedBlockingQueue(8_000),
NamedThreadFactory("payment-submission-executor"),
CallerBlockingRejectedExecutionHandler()
)

fun processPayment(orderId: UUID, amount: Int, paymentId: UUID, deadline: Long): Long {
suspend fun processPayment(orderId: UUID, amount: Int, paymentId: UUID, deadline: Long): Long {
val createdAt = System.currentTimeMillis()
if (!rateLimit.tickBlocking(deadline- System.currentTimeMillis())) {
throw TooManyRequestsException(deadline)
}
paymentExecutor.submit {
val createdEvent = paymentESService.create {
plannedRequests.increment()
val createdEvent = paymentESService.create {
it.create(
paymentId,
orderId,
amount
)
}
logger.trace("Payment ${createdEvent.paymentId} for order $orderId created.")

paymentService.submitPaymentRequest(paymentId, amount, createdAt, deadline)
}
logger.trace("Payment {} for order {} created.", createdEvent.paymentId, orderId)

paymentService.submitPaymentRequest(paymentId, amount, createdAt, deadline)

return createdAt
}
}
102 changes: 102 additions & 0 deletions src/main/kotlin/ru/quipy/payments/logic/PaymentCallback.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package ru.quipy.payments.logic

import io.micrometer.core.instrument.Timer
import io.micrometer.core.instrument.Counter
import kotlinx.coroutines.sync.Semaphore
import okhttp3.Call
import okhttp3.Callback
import okhttp3.OkHttpClient
import okhttp3.Request
import okhttp3.Response
import ru.quipy.core.EventSourcingService
import ru.quipy.payments.api.PaymentAggregate
import ru.quipy.payments.logic.PaymentExternalSystemAdapterImpl.Companion.logger
import ru.quipy.payments.logic.PaymentExternalSystemAdapterImpl.Companion.mapper
import java.io.InterruptedIOException
import java.net.SocketTimeoutException
import java.util.UUID
import java.util.concurrent.TimeUnit
import kotlin.math.pow


class PaymentCallback(val startedRequestsCounter: Counter, val semaphore: Semaphore, val accountName: String, val retryCount: Int, val paymentId: UUID, val transactionId: UUID, val paymentESService: EventSourcingService<UUID, PaymentAggregate, PaymentAggregateState>, val client: OkHttpClient, val request: Request, val timer: Timer, val deadline: Long, val timeBeforeCall: Long) : Callback {
override fun onFailure(call: Call, e: java.io.IOException) {

logger.debug("fail in callback for payment: {}, retry count: {}, deadline: {}, in time: {}", paymentId, retryCount, deadline, now(), e)
when (e) {
is SocketTimeoutException -> {
logger.warn("[$accountName] attempt ${retryCount + 1} timeout: $paymentId", e)
paymentESService.update(paymentId) {
it.logProcessing(false, now(), transactionId, "socket timeout")
}
}

is InterruptedIOException -> {
logger.warn("[$accountName] interrupted: $paymentId", e)

// Здесь мы обновляем состояние оплаты в зависимости от результата в базе данных оплат.
// Это требуется сделать ВО ВСЕХ ИСХОДАХ (успешная оплата / неуспешная / ошибочная ситуация)
paymentESService.update(paymentId) {
it.logProcessing(false, now(), transactionId, "interrupted IO")
}
}

else -> {
logger.warn("[$accountName] io error: $paymentId", e)
paymentESService.update(paymentId) {
it.logProcessing(false, now(), transactionId, "io exception")
}
}
}

if (retryCount + 1 >= 3) {
paymentESService.update(paymentId) {
it.logProcessing(false, now(), transactionId, "Max attempts reached")
}
startedRequestsCounter.increment()
semaphore.release()
return
}

val backoff = ((2.0.pow(retryCount.toDouble()) * 25).toLong() + kotlin.random.Random.nextLong(10))
val capped = backoff.coerceAtMost(deadline - now() - 5)
if (capped <= 0) {
paymentESService.update(paymentId) {
it.logProcessing(false, now(), transactionId, "Deadline expired")
}
startedRequestsCounter.increment()
semaphore.release()
return
}
val nCall = client.newCall(request)
nCall.enqueue(PaymentCallback(startedRequestsCounter, semaphore, accountName, retryCount + 1, paymentId, transactionId, paymentESService, client, request, timer, deadline, now()))
timer.record(now() - timeBeforeCall, TimeUnit.MILLISECONDS)
}

override fun onResponse(call: Call, response: Response) {
startedRequestsCounter.increment()
try {
logger.warn("Free space in semaphore: {}", semaphore.availablePermits)
logger.info(
"success in callback for payment: {}, retry count: {}, deadline: {}, in time: {}",
paymentId,
retryCount,
deadline,
now()
)
val rawBody = response.body?.string()
val parsed = try {
mapper.readValue(rawBody, ExternalSysResponse::class.java)
} catch (ex: Exception) {
ExternalSysResponse(transactionId.toString(), paymentId.toString(), false, ex.message)
}

paymentESService.update(paymentId) {
it.logProcessing(parsed.result, now(), transactionId, parsed.message)
}
} finally {
semaphore.release()
timer.record(now() - timeBeforeCall, TimeUnit.MILLISECONDS)
}
}
}
Loading