Skip to content
5 changes: 5 additions & 0 deletions src/dist/cfg/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,11 @@
<AppenderRef ref="MarketOrderLog"/>
</AsyncLogger>

<AsyncLogger name="MarketOrderPreProcessingLogger"
level="debug" additivity="false">
<AppenderRef ref="MarketOrderLog"/>
</AsyncLogger>

<AsyncLogger name="com.lykke.matching.engine.services.validators.impl.MarketOrderValidatorImpl"
level="debug" additivity="false">
<AppenderRef ref="MarketOrderLog"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ open class DatabaseAccessorConfig {
open fun cashTransferPreprocessorPersistenceManager(cashTransferOperationsPreprocessorRedisConnection: Optional<RedisConnection>): PersistenceManager {
return persistenceManagerFactory.get(cashTransferOperationsPreprocessorRedisConnection)
}

@Bean
open fun marketOrderPreprocessorPersistenceManager(marketOrderOrderPreprocessorRedisConnection: Optional<RedisConnection>): PersistenceManager {
return persistenceManagerFactory.get(marketOrderOrderPreprocessorRedisConnection)
}
//</editor-fold>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ open class LoggerConfig {
return LoggerFactory.getLogger("AppStarter")
}

@Bean
open fun marketOrderPreProcessingLogger(): ThrottlingLogger {
return ThrottlingLogger.getLogger("MarketOrderPreProcessingLogger")
}

@Bean
open fun singleLimitOrderPreProcessingLogger(): ThrottlingLogger {
return ThrottlingLogger.getLogger("SingleLimitOrderPreProcessing")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ open class RedisConfig {
open fun cashTransferOperationsPreprocessorRedisConnection(): RedisConnection? {
return redisConnectionFactory.getConnection("cashTransferOperationsPreprocessorRedisConnection")
}

@Bean
fun marketOrderOrderPreprocessorRedisConnection(): RedisConnection? {
return redisConnectionFactory.getConnection("marketOrderPreprocessorRedisConnection")
}
//</editor-fold>

//<editor-fold desc="Redis database accessors">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class MarketOrder(id: String,
volume: BigDecimal,
var price: BigDecimal?,
status: String,
statusDate: Date,
statusDate: Date?,
createdAt: Date,
registered: Date?,
var matchedAt: Date?,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.lykke.matching.engine.daos.context

import com.lykke.matching.engine.daos.Asset
import com.lykke.matching.engine.daos.AssetPair
import com.lykke.matching.engine.daos.v2.FeeInstruction
import com.lykke.matching.engine.daos.MarketOrder
import com.lykke.matching.engine.daos.fee.v2.NewFeeInstruction
import com.lykke.matching.engine.deduplication.ProcessedMessage
import java.math.BigDecimal

class MarketOrderContext(val messageId: String,
val assetPair: AssetPair?,
val baseAsset: Asset?,
val quotingAsset: Asset?,
val fee: FeeInstruction?,
val fees: List<NewFeeInstruction>,
val marketPriceDeviationThreshold: BigDecimal?,
val processedMessage: ProcessedMessage,
val marketOrder: MarketOrder)
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.lykke.matching.engine.incoming.parsers.data

import com.lykke.matching.engine.messages.MessageWrapper

class MarketOrderParsedData(messageWrapper: MessageWrapper): ParsedData(messageWrapper)
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package com.lykke.matching.engine.incoming.parsers.impl

import com.lykke.matching.engine.daos.AssetPair
import com.lykke.matching.engine.daos.MarketOrder
import com.lykke.matching.engine.daos.fee.v2.NewFeeInstruction
import com.lykke.matching.engine.daos.v2.FeeInstruction
import com.lykke.matching.engine.deduplication.ProcessedMessage
import com.lykke.matching.engine.fee.listOfFee
import com.lykke.matching.engine.holders.ApplicationSettingsHolder
import com.lykke.matching.engine.holders.AssetsPairsHolder
import com.lykke.matching.engine.holders.UUIDHolder
import com.lykke.matching.engine.incoming.parsers.ContextParser
import com.lykke.matching.engine.incoming.parsers.data.MarketOrderParsedData
import com.lykke.matching.engine.messages.MessageWrapper
import com.lykke.matching.engine.messages.ProtocolMessages
import com.lykke.matching.engine.order.OrderStatus
import com.lykke.matching.engine.daos.context.MarketOrderContext
import com.lykke.matching.engine.holders.AssetsHolder
import org.springframework.stereotype.Component
import java.math.BigDecimal
import java.util.*

@Component
class MarketOrderContextParser(private val assetsPairsHolder: AssetsPairsHolder,
private val assetsHolder: AssetsHolder,
private val uuidHolder: UUIDHolder,
private val applicationSettingsHolder: ApplicationSettingsHolder) : ContextParser<MarketOrderParsedData> {
override fun parse(messageWrapper: MessageWrapper): MarketOrderParsedData {
val protoMessage = parse(messageWrapper.byteArray)

messageWrapper.messageId = if (protoMessage.hasMessageId()) protoMessage.messageId else protoMessage.uid
messageWrapper.timestamp = protoMessage.timestamp
messageWrapper.parsedMessage = protoMessage
messageWrapper.id = protoMessage.uid
val processedMessage = ProcessedMessage(messageWrapper.type, messageWrapper.timestamp!!, messageWrapper.messageId!!)
messageWrapper.processedMessage = processedMessage

messageWrapper.context = getContext(messageWrapper.messageId!!, protoMessage, processedMessage)

return MarketOrderParsedData(messageWrapper)
}

private fun getContext(messageId: String, message: ProtocolMessages.MarketOrder, processedMessage: ProcessedMessage): MarketOrderContext {
val assetPair = assetsPairsHolder.getAssetPairAllowNulls(message.assetPairId)
val marketOrder = getMarketOrder(message)
return MarketOrderContext(messageId,
assetPair,
getBaseAsset(marketOrder, assetPair)?.let { assetsHolder.getAssetAllowNulls(it) },
getQuotingAsset(marketOrder, assetPair)?.let { assetsHolder.getAssetAllowNulls(it) },
getFeeInstruction(message),
getFeeInstructions(message),
assetPair?.marketOrderPriceDeviationThreshold
?: applicationSettingsHolder.marketOrderPriceDeviationThreshold(message.assetPairId),
processedMessage,
marketOrder)
}

private fun getMarketOrder(message: ProtocolMessages.MarketOrder): MarketOrder {
val feeInstruction = getFeeInstruction(message)
val feeInstructions = getFeeInstructions(message)

return MarketOrder(uuidHolder.getNextValue(), message.uid, message.assetPairId, message.clientId, BigDecimal.valueOf(message.volume), null,
OrderStatus.Processing.name, null, Date(message.timestamp), null, null, message.straight, BigDecimal.valueOf(message.reservedLimitVolume),
feeInstruction, listOfFee(feeInstruction, feeInstructions))
}

private fun getFeeInstruction(message: ProtocolMessages.MarketOrder): FeeInstruction? {
return if (message.hasFee()) FeeInstruction.create(message.fee) else null
}

private fun getFeeInstructions(message: ProtocolMessages.MarketOrder): List<NewFeeInstruction> {
return NewFeeInstruction.create(message.feesList)
}

private fun getBaseAsset(order: MarketOrder, assetPair: AssetPair?): String? {
return if (order.isStraight()) assetPair?.baseAssetId else assetPair?.quotingAssetId
}

private fun getQuotingAsset(order: MarketOrder, assetPair: AssetPair?): String? {
return if (order.isStraight()) assetPair?.quotingAssetId else assetPair?.baseAssetId
}

private fun parse(array: ByteArray): ProtocolMessages.MarketOrder {
return ProtocolMessages.MarketOrder.parseFrom(array)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package com.lykke.matching.engine.incoming.preprocessor.impl

import com.lykke.matching.engine.daos.MarketOrder
import com.lykke.matching.engine.holders.MessageProcessingStatusHolder
import com.lykke.matching.engine.incoming.parsers.data.MarketOrderParsedData
import com.lykke.matching.engine.incoming.parsers.impl.MarketOrderContextParser
import com.lykke.matching.engine.incoming.preprocessor.AbstractMessagePreprocessor
import com.lykke.matching.engine.messages.MessageStatus
import com.lykke.matching.engine.messages.MessageWrapper
import com.lykke.matching.engine.daos.context.MarketOrderContext
import com.lykke.matching.engine.database.PersistenceManager
import com.lykke.matching.engine.database.common.entity.PersistenceData
import com.lykke.matching.engine.deduplication.ProcessedMessagesCache
import com.lykke.matching.engine.holders.MessageSequenceNumberHolder
import com.lykke.matching.engine.messages.MessageType
import com.lykke.matching.engine.messages.ProtocolMessages
import com.lykke.matching.engine.order.OrderStatus
import com.lykke.matching.engine.outgoing.messages.MarketOrderWithTrades
import com.lykke.matching.engine.outgoing.messages.v2.builders.EventFactory
import com.lykke.matching.engine.services.MessageSender
import com.lykke.matching.engine.services.validators.impl.OrderValidationException
import com.lykke.matching.engine.services.validators.input.MarketOrderInputValidator
import com.lykke.matching.engine.utils.order.MessageStatusUtils
import com.lykke.utils.logging.MetricsLogger
import com.lykke.utils.logging.ThrottlingLogger
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.stereotype.Component
import java.util.*
import java.util.concurrent.BlockingQueue

@Component
class MarketOrderPreprocessor(marketOrderContextParser: MarketOrderContextParser,
preProcessedMessageQueue: BlockingQueue<MessageWrapper>,
private val messageProcessingStatusHolder: MessageProcessingStatusHolder,
private val rabbitSwapQueue: BlockingQueue<MarketOrderWithTrades>,
private val messageSender: MessageSender,
private val processedMessagesCache: ProcessedMessagesCache,
private val marketOrderPreprocessorPersistenceManager: PersistenceManager,
private val messageSequenceNumberHolder: MessageSequenceNumberHolder,
@Qualifier("marketOrderPreProcessingLogger")
private val logger: ThrottlingLogger) :
AbstractMessagePreprocessor<MarketOrderParsedData>(marketOrderContextParser,
messageProcessingStatusHolder,
preProcessedMessageQueue,
logger) {

private companion object {
private val METRICS_LOGGER = MetricsLogger.getLogger()
}

@Autowired
private lateinit var marketOrderInputValidator: MarketOrderInputValidator

override fun preProcessParsedData(parsedData: MarketOrderParsedData): Boolean {
val marketOrderContext = parsedData.messageWrapper.context as MarketOrderContext

if (messageProcessingStatusHolder.isTradeDisabled(marketOrderContext.assetPair)) {
writeResponse(parsedData.messageWrapper, MessageStatus.MESSAGE_PROCESSING_DISABLED)
return false
}

if (!validateData(marketOrderContext, parsedData.messageWrapper)) {
return false
}

return true
}

private fun validateData(marketOrderContext: MarketOrderContext, messageWrapper: MessageWrapper): Boolean {
try {
marketOrderInputValidator.performValidation(marketOrderContext)
} catch (e: OrderValidationException) {
processInvalidData(messageWrapper, marketOrderContext, e.orderStatus, e.message)
return false
}

return true
}

private fun writeResponse(messageWrapper: MessageWrapper, order: MarketOrder, status: MessageStatus, reason: String? = null) {
val marketOrderResponse = ProtocolMessages.MarketOrderResponse.newBuilder()
.setStatus(status.type)
if (order.price != null) {
marketOrderResponse.price = order.price!!.toDouble()
} else if (reason != null) {
marketOrderResponse.statusReason = reason
}
messageWrapper.writeMarketOrderResponse(marketOrderResponse)
}

private fun writeErrorResponse(messageWrapper: MessageWrapper,
order: MarketOrder,
statusReason: String? = null) {
writeResponse(messageWrapper, order, MessageStatusUtils.toMessageStatus(order.status), statusReason)
}

fun processInvalidData(messageWrapper: MessageWrapper,
context: MarketOrderContext,
status: OrderStatus,
message: String) {
val marketOrder = context.marketOrder
marketOrder.updateStatus(status, Date())
logger.info("Input validation failed messageId: ${context.messageId}, details: $message")


//todo: Should we send RMQ notification ?
sendErrorNotification(messageWrapper, context.marketOrder, Date())
saveProcessedMessage(context)

writeErrorResponse(messageWrapper, marketOrder, message)
}

private fun saveProcessedMessage(context: MarketOrderContext) {

val persistSuccess = marketOrderPreprocessorPersistenceManager.persist(PersistenceData(context.processedMessage))
if (!persistSuccess) {
throw Exception("Persistence error")
}

processedMessagesCache.addMessage(context.processedMessage)
}

private fun sendErrorNotification(messageWrapper: MessageWrapper,
order: MarketOrder,
now: Date) {
order.register(Date())
val marketOrderWithTrades = MarketOrderWithTrades(messageWrapper.messageId!!, order)
rabbitSwapQueue.put(marketOrderWithTrades)
val outgoingMessage = EventFactory.createExecutionEvent(messageSequenceNumberHolder.getNewValue(),
messageWrapper.messageId!!,
messageWrapper.id!!,
now,
MessageType.MARKET_ORDER,
marketOrderWithTrades)
messageSender.sendMessage(outgoingMessage)
}
}
Loading