diff --git a/pom.xml b/pom.xml
index a2e69dc4..8bef9716 100644
--- a/pom.xml
+++ b/pom.xml
@@ -52,6 +52,11 @@
machinegun-proto
1.43-3decc8f
+
+ dev.vality
+ fistful-proto
+ 1.188-f7ce08e
+
dev.vality.geck
filter
diff --git a/src/main/java/dev/vality/analytics/computer/WithdrawalCashFlowComputer.java b/src/main/java/dev/vality/analytics/computer/WithdrawalCashFlowComputer.java
new file mode 100644
index 00000000..5d10e80d
--- /dev/null
+++ b/src/main/java/dev/vality/analytics/computer/WithdrawalCashFlowComputer.java
@@ -0,0 +1,92 @@
+package dev.vality.analytics.computer;
+
+import dev.vality.analytics.domain.WithdrawalCashFlowResult;
+import dev.vality.fistful.cashflow.CashFlowAccount;
+import dev.vality.fistful.cashflow.FinalCashFlowPosting;
+import dev.vality.fistful.cashflow.MerchantCashFlowAccount;
+import dev.vality.fistful.cashflow.ProviderCashFlowAccount;
+import dev.vality.fistful.cashflow.SystemCashFlowAccount;
+import dev.vality.fistful.cashflow.WalletCashFlowAccount;
+import org.springframework.stereotype.Service;
+import org.springframework.util.CollectionUtils;
+
+import java.util.EnumMap;
+import java.util.List;
+
+@Service
+public class WithdrawalCashFlowComputer {
+
+ public WithdrawalCashFlowResult compute(List cashFlow) {
+ EnumMap cashFlowMap = new EnumMap<>(WithdrawalCashFlowType.class);
+
+ if (CollectionUtils.isEmpty(cashFlow)) {
+ return WithdrawalCashFlowResult.EMPTY;
+ }
+
+ for (FinalCashFlowPosting posting : cashFlow) {
+ if (posting == null || !posting.isSetSource() || !posting.isSetDestination() || !posting.isSetVolume()
+ || !posting.getSource().isSetAccountType() || !posting.getDestination().isSetAccountType()) {
+ continue;
+ }
+
+ WithdrawalCashFlowType type = getCashFlowType(
+ posting.getSource().getAccountType(),
+ posting.getDestination().getAccountType());
+ cashFlowMap.put(type, posting.getVolume().getAmount());
+ }
+
+ return WithdrawalCashFlowResult.builder()
+ .amount(cashFlowMap.getOrDefault(WithdrawalCashFlowType.AMOUNT, 0L))
+ .systemFee(cashFlowMap.getOrDefault(WithdrawalCashFlowType.FEE, 0L))
+ .providerFee(cashFlowMap.getOrDefault(WithdrawalCashFlowType.PROVIDER_FEE, 0L))
+ .build();
+ }
+
+ private WithdrawalCashFlowType getCashFlowType(CashFlowAccount source, CashFlowAccount destination) {
+ if (isWalletSenderSettlement(source) && isWalletReceiverDestination(destination)) {
+ return WithdrawalCashFlowType.AMOUNT;
+ }
+
+ if (isWalletSenderSettlement(source) && isSystemSettlement(destination)) {
+ return WithdrawalCashFlowType.FEE;
+ }
+
+ if (isSystemSettlement(source) && isProviderSettlement(destination)) {
+ return WithdrawalCashFlowType.PROVIDER_FEE;
+ }
+
+ if (isMerchantSettlement(source) && isProviderSettlement(destination)) {
+ return WithdrawalCashFlowType.REFUND_AMOUNT;
+ }
+
+ return WithdrawalCashFlowType.UNKNOWN;
+ }
+
+ private boolean isWalletSenderSettlement(CashFlowAccount account) {
+ return account.isSetWallet() && account.getWallet() == WalletCashFlowAccount.sender_settlement;
+ }
+
+ private boolean isWalletReceiverDestination(CashFlowAccount account) {
+ return account.isSetWallet() && account.getWallet() == WalletCashFlowAccount.receiver_destination;
+ }
+
+ private boolean isSystemSettlement(CashFlowAccount account) {
+ return account.isSetSystem() && account.getSystem() == SystemCashFlowAccount.settlement;
+ }
+
+ private boolean isProviderSettlement(CashFlowAccount account) {
+ return account.isSetProvider() && account.getProvider() == ProviderCashFlowAccount.settlement;
+ }
+
+ private boolean isMerchantSettlement(CashFlowAccount account) {
+ return account.isSetMerchant() && account.getMerchant() == MerchantCashFlowAccount.settlement;
+ }
+
+ private enum WithdrawalCashFlowType {
+ AMOUNT,
+ FEE,
+ PROVIDER_FEE,
+ REFUND_AMOUNT,
+ UNKNOWN
+ }
+}
diff --git a/src/main/java/dev/vality/analytics/config/KafkaConfig.java b/src/main/java/dev/vality/analytics/config/KafkaConfig.java
index 600b39e8..0a09f3ed 100644
--- a/src/main/java/dev/vality/analytics/config/KafkaConfig.java
+++ b/src/main/java/dev/vality/analytics/config/KafkaConfig.java
@@ -36,6 +36,7 @@ public class KafkaConfig {
private static final String RESULT_ANALYTICS = "result-analytics";
private static final String PARTY_RESULT_ANALYTICS = "party-result-analytics";
private static final String DOMINANT_ANALYTICS = "dominant-analytics";
+ private static final String WITHDRAWAL_ANALYTICS = "withdrawal-analytics";
private final ConsumerGroupIdService consumerGroupIdService;
private final KafkaProperties kafkaProperties;
@@ -53,6 +54,8 @@ public class KafkaConfig {
private String maxPollRecordsDominantListener;
@Value("${kafka.topic.rate.max.poll.records}")
private String maxPollRecordsRatesListener;
+ @Value("${kafka.topic.withdrawal.max.poll.records}")
+ private String maxPollRecordsWithdrawalListener;
@Value("${kafka.consumer.concurrency}")
private int concurrencyListeners;
@Value("${kafka.topic.rate.groupId}")
@@ -88,6 +91,16 @@ public ConcurrentKafkaListenerContainerFactory dominan
return factory;
}
+ @Bean
+ public ConcurrentKafkaListenerContainerFactory withdrawalListenerContainerFactory() {
+ ConcurrentKafkaListenerContainerFactory factory =
+ new ConcurrentKafkaListenerContainerFactory<>();
+ String consumerGroup = consumerGroupIdService.generateGroupId(WITHDRAWAL_ANALYTICS);
+ initDefaultListenerProperties(factory, consumerGroup,
+ new MachineEventDeserializer(), maxPollRecordsWithdrawalListener);
+ return factory;
+ }
+
@Bean
public ConcurrentKafkaListenerContainerFactory rateContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory =
diff --git a/src/main/java/dev/vality/analytics/config/SerializeConfig.java b/src/main/java/dev/vality/analytics/config/SerializeConfig.java
index e601706d..6886ad98 100644
--- a/src/main/java/dev/vality/analytics/config/SerializeConfig.java
+++ b/src/main/java/dev/vality/analytics/config/SerializeConfig.java
@@ -1,5 +1,6 @@
package dev.vality.analytics.config;
+import dev.vality.fistful.withdrawal.TimestampedChange;
import dev.vality.damsel.domain_config_v2.HistoricalCommit;
import dev.vality.exrates.events.CurrencyEvent;
import dev.vality.geck.serializer.Geck;
@@ -33,4 +34,20 @@ public HistoricalCommit deserialize(byte[] bytes) {
});
}
+ @Bean
+ public BinaryDeserializer withdrawalTimestampedChangeBinaryDeserializer() {
+ return new AbstractThriftBinaryDeserializer<>() {
+ @Override
+ public TimestampedChange deserialize(byte[] bytes) {
+ return Geck.msgPackToTBase(bytes, TimestampedChange.class);
+ }
+ };
+ }
+
+ @Bean
+ public MachineEventParser withdrawalTimestampedChangeMachineEventParser(
+ BinaryDeserializer withdrawalTimestampedChangeBinaryDeserializer) {
+ return new MachineEventParser<>(withdrawalTimestampedChangeBinaryDeserializer);
+ }
+
}
diff --git a/src/main/java/dev/vality/analytics/constant/EventType.java b/src/main/java/dev/vality/analytics/constant/EventType.java
index fc943c0e..dc35dbb2 100644
--- a/src/main/java/dev/vality/analytics/constant/EventType.java
+++ b/src/main/java/dev/vality/analytics/constant/EventType.java
@@ -24,7 +24,10 @@ public enum EventType {
".payload.session_transaction_bound", new IsNullCondition().not()),
INVOICE_PAYMENT_RISK_SCORE_CHANGED("invoice_payment_change.payload.invoice_payment_risk_score_changed",
new IsNullCondition().not()),
- RATE_CREATED("created", new IsNullCondition().not());
+ WITHDRAWAL_CREATED("change.created", new IsNullCondition().not()),
+ WITHDRAWAL_ROUTE_CHANGED("change.route", new IsNullCondition().not()),
+ WITHDRAWAL_TRANSFER_CHANGED("change.transfer.payload.created", new IsNullCondition().not()),
+ WITHDRAWAL_STATUS_CHANGED("change.status_changed", new IsNullCondition().not());
Filter filter;
diff --git a/src/main/java/dev/vality/analytics/constant/WithdrawalStatus.java b/src/main/java/dev/vality/analytics/constant/WithdrawalStatus.java
new file mode 100644
index 00000000..69f7fd31
--- /dev/null
+++ b/src/main/java/dev/vality/analytics/constant/WithdrawalStatus.java
@@ -0,0 +1,7 @@
+package dev.vality.analytics.constant;
+
+public enum WithdrawalStatus {
+ pending,
+ succeeded,
+ failed
+}
diff --git a/src/main/java/dev/vality/analytics/dao/model/WithdrawalRow.java b/src/main/java/dev/vality/analytics/dao/model/WithdrawalRow.java
new file mode 100644
index 00000000..3c720ec8
--- /dev/null
+++ b/src/main/java/dev/vality/analytics/dao/model/WithdrawalRow.java
@@ -0,0 +1,32 @@
+package dev.vality.analytics.dao.model;
+
+import dev.vality.analytics.constant.WithdrawalStatus;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.LocalDateTime;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class WithdrawalRow {
+
+ private LocalDateTime eventTime;
+ private String partyId;
+ private String withdrawalId;
+ private long sequenceId;
+ private LocalDateTime withdrawalTime;
+ private String walletId;
+ private String destinationId;
+ private String providerId;
+ private String terminal;
+ private long amount;
+ private long systemFee;
+ private long providerFee;
+ private String currency;
+ private WithdrawalStatus status;
+
+}
diff --git a/src/main/java/dev/vality/analytics/dao/model/WithdrawalStateSnapshot.java b/src/main/java/dev/vality/analytics/dao/model/WithdrawalStateSnapshot.java
new file mode 100644
index 00000000..9bf1da0f
--- /dev/null
+++ b/src/main/java/dev/vality/analytics/dao/model/WithdrawalStateSnapshot.java
@@ -0,0 +1,31 @@
+package dev.vality.analytics.dao.model;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.LocalDateTime;
+
+@Data
+@Builder(toBuilder = true)
+@NoArgsConstructor
+@AllArgsConstructor
+public class WithdrawalStateSnapshot {
+
+ private String withdrawalId;
+ private String partyId;
+ private String walletId;
+ private String destinationId;
+ private String currency;
+ private Long requestedAmount;
+ private Long amount;
+ private Long systemFee;
+ private Long providerFee;
+ private LocalDateTime withdrawalCreatedAt;
+ private String providerId;
+ private String terminal;
+ private long lastSequenceId;
+ private LocalDateTime updatedAt;
+
+}
diff --git a/src/main/java/dev/vality/analytics/dao/repository/clickhouse/ClickHouseWithdrawalBatchPreparedStatementSetter.java b/src/main/java/dev/vality/analytics/dao/repository/clickhouse/ClickHouseWithdrawalBatchPreparedStatementSetter.java
new file mode 100644
index 00000000..9058d818
--- /dev/null
+++ b/src/main/java/dev/vality/analytics/dao/repository/clickhouse/ClickHouseWithdrawalBatchPreparedStatementSetter.java
@@ -0,0 +1,63 @@
+package dev.vality.analytics.dao.repository.clickhouse;
+
+import dev.vality.analytics.constant.ClickHouseUtilsValue;
+import dev.vality.analytics.dao.model.WithdrawalRow;
+import org.springframework.jdbc.core.BatchPreparedStatementSetter;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.List;
+
+public class ClickHouseWithdrawalBatchPreparedStatementSetter implements BatchPreparedStatementSetter {
+
+ public static final String INSERT = "INSERT INTO analytic.events_sink_withdrawal " +
+ "(timestamp, eventTime, eventTimeHour, partyId, withdrawalId, sequenceId, withdrawalTime, " +
+ "walletId, " +
+ "destinationId, providerId, terminal, amount, systemFee, providerFee, " +
+ "currency, status) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
+ private final List batch;
+
+ public ClickHouseWithdrawalBatchPreparedStatementSetter(List batch) {
+ this.batch = batch;
+ }
+
+ @Override
+ public void setValues(PreparedStatement ps, int i) throws SQLException {
+ WithdrawalRow row = batch.get(i);
+ int column = 1;
+ ps.setObject(column++, row.getEventTime().toLocalDate());
+ ps.setLong(column++, row.getEventTime().toEpochSecond(ZoneOffset.UTC));
+ ps.setLong(
+ column++,
+ row.getEventTime().toInstant(ZoneOffset.UTC).truncatedTo(ChronoUnit.HOURS).toEpochMilli());
+ ps.setString(column++, defaultString(row.getPartyId()));
+ ps.setString(column++, defaultString(row.getWithdrawalId()));
+ ps.setLong(column++, row.getSequenceId());
+ ps.setLong(column++, row.getWithdrawalTime().toEpochSecond(ZoneOffset.UTC));
+ ps.setString(column++, defaultString(row.getWalletId()));
+ ps.setString(column++, defaultString(row.getDestinationId()));
+ ps.setString(column++, defaultString(row.getProviderId()));
+ ps.setString(column++, defaultString(row.getTerminal()));
+ ps.setLong(column++, safeUnsigned(row.getAmount()));
+ ps.setLong(column++, safeUnsigned(row.getSystemFee()));
+ ps.setLong(column++, safeUnsigned(row.getProviderFee()));
+ ps.setString(column++, defaultString(row.getCurrency()));
+ ps.setString(column, row.getStatus().name());
+ }
+
+ @Override
+ public int getBatchSize() {
+ return batch.size();
+ }
+
+ private String defaultString(String value) {
+ return value != null ? value : ClickHouseUtilsValue.UNKNOWN;
+ }
+
+ private long safeUnsigned(long value) {
+ return Math.max(0L, value);
+ }
+}
diff --git a/src/main/java/dev/vality/analytics/dao/repository/clickhouse/ClickHouseWithdrawalRepository.java b/src/main/java/dev/vality/analytics/dao/repository/clickhouse/ClickHouseWithdrawalRepository.java
new file mode 100644
index 00000000..1af9425a
--- /dev/null
+++ b/src/main/java/dev/vality/analytics/dao/repository/clickhouse/ClickHouseWithdrawalRepository.java
@@ -0,0 +1,36 @@
+package dev.vality.analytics.dao.repository.clickhouse;
+
+import dev.vality.analytics.dao.model.WithdrawalRow;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.retry.annotation.Backoff;
+import org.springframework.retry.annotation.Retryable;
+import org.springframework.stereotype.Service;
+import org.springframework.util.CollectionUtils;
+
+import java.sql.SQLException;
+import java.util.List;
+
+@Slf4j
+@Service
+@RequiredArgsConstructor
+public class ClickHouseWithdrawalRepository {
+
+ private final JdbcTemplate clickHouseJdbcTemplate;
+
+ @Retryable(retryFor = SQLException.class, backoff = @Backoff(delay = 5000))
+ public void insertBatch(List withdrawalRows) {
+ if (CollectionUtils.isEmpty(withdrawalRows)) {
+ return;
+ }
+
+ log.info("Batch start insert withdrawalRows: {} firstElement: {}",
+ withdrawalRows.size(), withdrawalRows.get(0).getWithdrawalId());
+ clickHouseJdbcTemplate.batchUpdate(
+ ClickHouseWithdrawalBatchPreparedStatementSetter.INSERT,
+ new ClickHouseWithdrawalBatchPreparedStatementSetter(withdrawalRows));
+ log.info("Batch inserted withdrawalRows: {} firstElement: {}",
+ withdrawalRows.size(), withdrawalRows.get(0).getWithdrawalId());
+ }
+}
diff --git a/src/main/java/dev/vality/analytics/dao/repository/postgres/PostgresWithdrawalStateRepository.java b/src/main/java/dev/vality/analytics/dao/repository/postgres/PostgresWithdrawalStateRepository.java
new file mode 100644
index 00000000..ce9880fc
--- /dev/null
+++ b/src/main/java/dev/vality/analytics/dao/repository/postgres/PostgresWithdrawalStateRepository.java
@@ -0,0 +1,96 @@
+package dev.vality.analytics.dao.repository.postgres;
+
+import dev.vality.analytics.dao.model.WithdrawalStateSnapshot;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.stereotype.Service;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Optional;
+
+@Slf4j
+@Service
+@RequiredArgsConstructor
+public class PostgresWithdrawalStateRepository {
+
+ private static final String UPSERT = "INSERT INTO analytics.withdrawal_state " +
+ "(withdrawal_id, party_id, wallet_id, destination_id, currency, requested_amount, amount, " +
+ "system_fee, " +
+ "provider_fee, withdrawal_created_at, provider_id, terminal, " +
+ "last_sequence_id, " +
+ "updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) " +
+ "ON CONFLICT (withdrawal_id) DO UPDATE SET " +
+ "party_id = EXCLUDED.party_id, " +
+ "wallet_id = EXCLUDED.wallet_id, " +
+ "destination_id = EXCLUDED.destination_id, " +
+ "currency = EXCLUDED.currency, " +
+ "requested_amount = EXCLUDED.requested_amount, " +
+ "amount = EXCLUDED.amount, " +
+ "system_fee = EXCLUDED.system_fee, " +
+ "provider_fee = EXCLUDED.provider_fee, " +
+ "withdrawal_created_at = EXCLUDED.withdrawal_created_at, " +
+ "provider_id = EXCLUDED.provider_id, " +
+ "terminal = EXCLUDED.terminal, " +
+ "last_sequence_id = EXCLUDED.last_sequence_id, " +
+ "updated_at = EXCLUDED.updated_at";
+ private static final String SELECT = "SELECT withdrawal_id, party_id, wallet_id, destination_id, currency, " +
+ "requested_amount, amount, system_fee, provider_fee, withdrawal_created_at, " +
+ "provider_id, terminal, last_sequence_id, updated_at " +
+ "FROM analytics.withdrawal_state " +
+ "WHERE withdrawal_id = ?";
+
+ private final JdbcTemplate postgresJdbcTemplate;
+
+ public Optional findByWithdrawalId(String withdrawalId) {
+ return postgresJdbcTemplate.query(
+ SELECT,
+ (resultSet, rowNum) -> map(resultSet),
+ withdrawalId)
+ .stream()
+ .findFirst();
+ }
+
+ public void upsert(WithdrawalStateSnapshot snapshot) {
+ postgresJdbcTemplate.update(
+ UPSERT,
+ snapshot.getWithdrawalId(),
+ snapshot.getPartyId(),
+ snapshot.getWalletId(),
+ snapshot.getDestinationId(),
+ snapshot.getCurrency(),
+ snapshot.getRequestedAmount(),
+ snapshot.getAmount(),
+ snapshot.getSystemFee(),
+ snapshot.getProviderFee(),
+ snapshot.getWithdrawalCreatedAt(),
+ snapshot.getProviderId(),
+ snapshot.getTerminal(),
+ snapshot.getLastSequenceId(),
+ snapshot.getUpdatedAt());
+ log.debug("Upserted withdrawal state, withdrawalId={}, sequenceId={}",
+ snapshot.getWithdrawalId(), snapshot.getLastSequenceId());
+ }
+
+ private WithdrawalStateSnapshot map(ResultSet resultSet) throws SQLException {
+ return WithdrawalStateSnapshot.builder()
+ .withdrawalId(resultSet.getString("withdrawal_id"))
+ .partyId(resultSet.getString("party_id"))
+ .walletId(resultSet.getString("wallet_id"))
+ .destinationId(resultSet.getString("destination_id"))
+ .currency(resultSet.getString("currency"))
+ .requestedAmount((Long) resultSet.getObject("requested_amount"))
+ .amount((Long) resultSet.getObject("amount"))
+ .systemFee((Long) resultSet.getObject("system_fee"))
+ .providerFee((Long) resultSet.getObject("provider_fee"))
+ .withdrawalCreatedAt(resultSet.getTimestamp("withdrawal_created_at") != null
+ ? resultSet.getTimestamp("withdrawal_created_at").toLocalDateTime()
+ : null)
+ .providerId(resultSet.getString("provider_id"))
+ .terminal(resultSet.getString("terminal"))
+ .lastSequenceId(resultSet.getLong("last_sequence_id"))
+ .updatedAt(resultSet.getTimestamp("updated_at").toLocalDateTime())
+ .build();
+ }
+}
diff --git a/src/main/java/dev/vality/analytics/domain/WithdrawalCashFlowResult.java b/src/main/java/dev/vality/analytics/domain/WithdrawalCashFlowResult.java
new file mode 100644
index 00000000..04e9520e
--- /dev/null
+++ b/src/main/java/dev/vality/analytics/domain/WithdrawalCashFlowResult.java
@@ -0,0 +1,15 @@
+package dev.vality.analytics.domain;
+
+import lombok.Builder;
+import lombok.Data;
+
+@Data
+@Builder
+public class WithdrawalCashFlowResult {
+
+ public static final WithdrawalCashFlowResult EMPTY = new WithdrawalCashFlowResult(0L, 0L, 0L);
+
+ private final long amount;
+ private final long systemFee;
+ private final long providerFee;
+}
diff --git a/src/main/java/dev/vality/analytics/listener/WithdrawalListener.java b/src/main/java/dev/vality/analytics/listener/WithdrawalListener.java
new file mode 100644
index 00000000..f6f28fd9
--- /dev/null
+++ b/src/main/java/dev/vality/analytics/listener/WithdrawalListener.java
@@ -0,0 +1,54 @@
+package dev.vality.analytics.listener;
+
+import dev.vality.analytics.listener.handler.withdrawal.WithdrawalEventHandler;
+import dev.vality.machinegun.eventsink.MachineEvent;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.support.Acknowledgment;
+import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.messaging.handler.annotation.Header;
+import org.springframework.stereotype.Service;
+import org.springframework.util.CollectionUtils;
+
+import java.time.Duration;
+import java.util.List;
+
+@Slf4j
+@Service
+@RequiredArgsConstructor
+public class WithdrawalListener {
+
+ private final WithdrawalEventHandler withdrawalEventHandler;
+
+ @Value("${kafka.consumer.throttling-timeout-ms}")
+ private int throttlingTimeout;
+
+ @KafkaListener(
+ autoStartup = "${kafka.listener.withdrawal.enabled}",
+ topics = "${kafka.topic.withdrawal.initial}",
+ containerFactory = "withdrawalListenerContainerFactory")
+ public void listen(
+ List batch,
+ @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
+ @Header(KafkaHeaders.OFFSET) int offsets,
+ Acknowledgment ack) {
+ log.info("WithdrawalListener listen offsets: {}, partition: {}, batch.size: {}",
+ offsets, partition, batch.size());
+
+ try {
+ if (CollectionUtils.isEmpty(batch)) {
+ ack.acknowledge();
+ return;
+ }
+
+ withdrawalEventHandler.handle(batch);
+ ack.acknowledge();
+ } catch (Exception e) {
+ log.error("Error when WithdrawalListener listen", e);
+ ack.nack(Duration.ofMillis(throttlingTimeout));
+ throw e;
+ }
+ }
+}
diff --git a/src/main/java/dev/vality/analytics/listener/handler/withdrawal/WithdrawalEventContext.java b/src/main/java/dev/vality/analytics/listener/handler/withdrawal/WithdrawalEventContext.java
new file mode 100644
index 00000000..56606f28
--- /dev/null
+++ b/src/main/java/dev/vality/analytics/listener/handler/withdrawal/WithdrawalEventContext.java
@@ -0,0 +1,21 @@
+package dev.vality.analytics.listener.handler.withdrawal;
+
+import dev.vality.analytics.dao.model.WithdrawalStateSnapshot;
+import dev.vality.fistful.withdrawal.TimestampedChange;
+import dev.vality.machinegun.eventsink.MachineEvent;
+import lombok.Builder;
+import lombok.Value;
+
+import java.time.LocalDateTime;
+
+@Value
+@Builder
+public class WithdrawalEventContext {
+
+ MachineEvent machineEvent;
+ TimestampedChange timestampedChange;
+ LocalDateTime eventTime;
+ String withdrawalId;
+ WithdrawalStateSnapshot currentState;
+
+}
diff --git a/src/main/java/dev/vality/analytics/listener/handler/withdrawal/WithdrawalEventHandler.java b/src/main/java/dev/vality/analytics/listener/handler/withdrawal/WithdrawalEventHandler.java
new file mode 100644
index 00000000..cca2eaa8
--- /dev/null
+++ b/src/main/java/dev/vality/analytics/listener/handler/withdrawal/WithdrawalEventHandler.java
@@ -0,0 +1,170 @@
+package dev.vality.analytics.listener.handler.withdrawal;
+
+import dev.vality.analytics.dao.model.WithdrawalRow;
+import dev.vality.analytics.dao.model.WithdrawalStateSnapshot;
+import dev.vality.analytics.dao.repository.clickhouse.ClickHouseWithdrawalRepository;
+import dev.vality.analytics.dao.repository.postgres.PostgresWithdrawalStateRepository;
+import dev.vality.analytics.listener.mapper.withdrawal.WithdrawalMapper;
+import dev.vality.analytics.utils.TimestampUtil;
+import dev.vality.fistful.withdrawal.TimestampedChange;
+import dev.vality.machinegun.eventsink.MachineEvent;
+import dev.vality.sink.common.parser.impl.MachineEventParser;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+import org.springframework.util.CollectionUtils;
+
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+@Slf4j
+@Service
+@RequiredArgsConstructor
+public class WithdrawalEventHandler {
+
+ private final MachineEventParser withdrawalTimestampedChangeMachineEventParser;
+ private final PostgresWithdrawalStateRepository postgresWithdrawalStateRepository;
+ private final ClickHouseWithdrawalRepository clickHouseWithdrawalRepository;
+ private final List withdrawalMappers;
+
+ public void handle(List batch) {
+ if (CollectionUtils.isEmpty(batch)) {
+ return;
+ }
+
+ Map stateCache = new HashMap<>();
+ List withdrawalRows = new ArrayList<>();
+
+ for (MachineEvent machineEvent : batch) {
+ WithdrawalEventContext context = prepareContext(machineEvent, stateCache);
+ if (context == null) {
+ continue;
+ }
+
+ WithdrawalMappingResult result = map(context);
+ if (result == null) {
+ continue;
+ }
+
+ applyMappingResult(result, stateCache, withdrawalRows);
+ }
+
+ clickHouseWithdrawalRepository.insertBatch(withdrawalRows);
+ }
+
+ private WithdrawalEventContext prepareContext(
+ MachineEvent machineEvent,
+ Map stateCache) {
+ TimestampedChange timestampedChange = parse(machineEvent);
+ if (timestampedChange == null || !timestampedChange.isSetChange()) {
+ return null;
+ }
+
+ LocalDateTime eventTime = TimestampUtil.parseLocalDateTime(timestampedChange.getOccuredAt());
+ if (eventTime == null) {
+ log.warn("Skipping withdrawal event with invalid occured_at, sourceId={}, eventId={}",
+ machineEvent.getSourceId(), machineEvent.getEventId());
+ return null;
+ }
+
+ String withdrawalId = resolveWithdrawalId(machineEvent);
+ if (withdrawalId == null) {
+ log.warn("Skipping withdrawal event without withdrawal id, eventId={}", machineEvent.getEventId());
+ return null;
+ }
+
+ WithdrawalStateSnapshot currentState = getState(withdrawalId, stateCache);
+ if (currentState != null && machineEvent.getEventId() <= currentState.getLastSequenceId()) {
+ log.debug("Skipping stale withdrawal event, withdrawalId={}, eventId={}, lastSequenceId={}",
+ withdrawalId, machineEvent.getEventId(), currentState.getLastSequenceId());
+ return null;
+ }
+
+ return WithdrawalEventContext.builder()
+ .machineEvent(machineEvent)
+ .timestampedChange(timestampedChange)
+ .eventTime(eventTime)
+ .withdrawalId(withdrawalId)
+ .currentState(currentState)
+ .build();
+ }
+
+ private WithdrawalMappingResult map(WithdrawalEventContext context) {
+ for (WithdrawalMapper mapper : withdrawalMappers) {
+ if (mapper.accept(context.getTimestampedChange())) {
+ WithdrawalMappingResult result = mapper.map(context.getTimestampedChange(), context);
+ if (result == null) {
+ logEmptyContextResult(context, mapper);
+ }
+ return result;
+ }
+ }
+ log.debug("No withdrawal mapper matched, withdrawalId={}, eventId={}",
+ context.getWithdrawalId(), context.getMachineEvent().getEventId());
+ return null;
+ }
+
+ private void applyMappingResult(
+ WithdrawalMappingResult result,
+ Map stateCache,
+ List withdrawalRows) {
+ if (result.getStateSnapshot() != null) {
+ cachedUpsert(result.getStateSnapshot(), stateCache);
+ }
+
+ if (result.getWithdrawalRow() != null) {
+ withdrawalRows.add(result.getWithdrawalRow());
+ }
+ }
+
+ private void cachedUpsert(WithdrawalStateSnapshot snapshot, Map stateCache) {
+ postgresWithdrawalStateRepository.upsert(snapshot);
+ stateCache.put(snapshot.getWithdrawalId(), snapshot);
+ }
+
+ private WithdrawalStateSnapshot getState(String withdrawalId, Map stateCache) {
+ WithdrawalStateSnapshot cached = stateCache.get(withdrawalId);
+ if (cached != null) {
+ return cached;
+ }
+
+ Optional stored = postgresWithdrawalStateRepository.findByWithdrawalId(withdrawalId);
+ stored.ifPresent(snapshot -> stateCache.put(withdrawalId, snapshot));
+ return stored.orElse(null);
+ }
+
+ private TimestampedChange parse(MachineEvent machineEvent) {
+ try {
+ return withdrawalTimestampedChangeMachineEventParser.parse(machineEvent);
+ } catch (Exception e) {
+ log.warn("Failed to parse withdrawal event, sourceId={}, eventId={}",
+ machineEvent.getSourceId(), machineEvent.getEventId(), e);
+ return null;
+ }
+ }
+
+ private String resolveWithdrawalId(MachineEvent machineEvent) {
+ if (machineEvent.isSetSourceId()) {
+ return machineEvent.getSourceId();
+ }
+ return null;
+ }
+
+ private void logEmptyContextResult(WithdrawalEventContext context, WithdrawalMapper mapper) {
+ if (context.getCurrentState() == null) {
+ log.warn("Skipping {} change without reducer state, withdrawalId={}, eventId={}",
+ mapper.getChangeType(),
+ context.getWithdrawalId(),
+ context.getMachineEvent().getEventId());
+ } else {
+ log.debug("Skipping {} change because mapper produced no update, withdrawalId={}, eventId={}",
+ mapper.getChangeType(),
+ context.getWithdrawalId(),
+ context.getMachineEvent().getEventId());
+ }
+ }
+}
diff --git a/src/main/java/dev/vality/analytics/listener/handler/withdrawal/WithdrawalMappingResult.java b/src/main/java/dev/vality/analytics/listener/handler/withdrawal/WithdrawalMappingResult.java
new file mode 100644
index 00000000..6048927c
--- /dev/null
+++ b/src/main/java/dev/vality/analytics/listener/handler/withdrawal/WithdrawalMappingResult.java
@@ -0,0 +1,15 @@
+package dev.vality.analytics.listener.handler.withdrawal;
+
+import dev.vality.analytics.dao.model.WithdrawalRow;
+import dev.vality.analytics.dao.model.WithdrawalStateSnapshot;
+import lombok.Builder;
+import lombok.Value;
+
+@Value
+@Builder
+public class WithdrawalMappingResult {
+
+ WithdrawalStateSnapshot stateSnapshot;
+ WithdrawalRow withdrawalRow;
+
+}
diff --git a/src/main/java/dev/vality/analytics/listener/mapper/rate/CurrencyEventMapper.java b/src/main/java/dev/vality/analytics/listener/mapper/rate/CurrencyEventMapper.java
index f351a540..178e0945 100644
--- a/src/main/java/dev/vality/analytics/listener/mapper/rate/CurrencyEventMapper.java
+++ b/src/main/java/dev/vality/analytics/listener/mapper/rate/CurrencyEventMapper.java
@@ -1,12 +1,12 @@
package dev.vality.analytics.listener.mapper.rate;
import dev.vality.analytics.domain.db.tables.pojos.Rate;
+import dev.vality.analytics.utils.TimestampUtil;
import dev.vality.exrates.base.Currency;
import dev.vality.exrates.base.Rational;
import dev.vality.exrates.events.CurrencyEvent;
import dev.vality.exrates.events.CurrencyEventPayload;
import dev.vality.exrates.events.CurrencyExchangeRate;
-import dev.vality.geck.common.util.TypeUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@@ -69,12 +69,11 @@ private CurrencyExchangeRate validateAndGetExchangeRate(CurrencyEvent event, Str
}
private LocalDateTime parseEventTime(String timestamp, String eventId) {
- try {
- return TypeUtil.stringToLocalDateTime(timestamp);
- } catch (Exception e) {
- log.warn("Failed to parse timestamp '{}' for CurrencyEvent, eventId={}", timestamp, eventId, e);
- return null;
+ LocalDateTime eventTime = TimestampUtil.parseLocalDateTime(timestamp);
+ if (eventTime == null) {
+ log.warn("Failed to parse timestamp '{}' for CurrencyEvent, eventId={}", timestamp, eventId);
}
+ return eventTime;
}
private Rate buildRate(
diff --git a/src/main/java/dev/vality/analytics/listener/mapper/withdrawal/AbstractWithdrawalMapper.java b/src/main/java/dev/vality/analytics/listener/mapper/withdrawal/AbstractWithdrawalMapper.java
new file mode 100644
index 00000000..c62918f3
--- /dev/null
+++ b/src/main/java/dev/vality/analytics/listener/mapper/withdrawal/AbstractWithdrawalMapper.java
@@ -0,0 +1,71 @@
+package dev.vality.analytics.listener.mapper.withdrawal;
+
+import dev.vality.analytics.constant.WithdrawalStatus;
+import dev.vality.analytics.listener.handler.withdrawal.WithdrawalEventContext;
+import dev.vality.analytics.listener.handler.withdrawal.WithdrawalMappingResult;
+import dev.vality.analytics.listener.mapper.AbstractMapper;
+import dev.vality.analytics.utils.TimestampUtil;
+import dev.vality.fistful.withdrawal.TimestampedChange;
+
+import java.time.LocalDateTime;
+
+public abstract class AbstractWithdrawalMapper
+ extends AbstractMapper
+ implements WithdrawalMapper {
+
+ protected LocalDateTime parseTime(String timestamp) {
+ return TimestampUtil.parseLocalDateTime(timestamp);
+ }
+
+ protected String extractProviderId(dev.vality.fistful.withdrawal.Route route) {
+ if (route == null) {
+ return null;
+ }
+
+ if (route.isSetProviderId()) {
+ return String.valueOf(route.getProviderId());
+ }
+
+ if (route.isSetProviderIdLegacy()) {
+ return route.getProviderIdLegacy();
+ }
+
+ return null;
+ }
+
+ protected String extractTerminal(dev.vality.fistful.withdrawal.Route route) {
+ if (route == null) {
+ return null;
+ }
+
+ if (route.isSetTerminalId()) {
+ return String.valueOf(route.getTerminalId());
+ }
+
+ if (route.isSetTerminalIdLegacy()) {
+ return route.getTerminalIdLegacy();
+ }
+
+ return null;
+ }
+
+ protected WithdrawalStatus mapStatus(dev.vality.fistful.withdrawal.status.Status status) {
+ if (status == null) {
+ return null;
+ }
+
+ if (status.isSetPending()) {
+ return WithdrawalStatus.pending;
+ }
+
+ if (status.isSetSucceeded()) {
+ return WithdrawalStatus.succeeded;
+ }
+
+ if (status.isSetFailed()) {
+ return WithdrawalStatus.failed;
+ }
+
+ return null;
+ }
+}
diff --git a/src/main/java/dev/vality/analytics/listener/mapper/withdrawal/WithdrawalCreatedMapper.java b/src/main/java/dev/vality/analytics/listener/mapper/withdrawal/WithdrawalCreatedMapper.java
new file mode 100644
index 00000000..c82b558f
--- /dev/null
+++ b/src/main/java/dev/vality/analytics/listener/mapper/withdrawal/WithdrawalCreatedMapper.java
@@ -0,0 +1,48 @@
+package dev.vality.analytics.listener.mapper.withdrawal;
+
+import dev.vality.analytics.constant.EventType;
+import dev.vality.analytics.dao.model.WithdrawalStateSnapshot;
+import dev.vality.analytics.listener.handler.withdrawal.WithdrawalEventContext;
+import dev.vality.analytics.listener.handler.withdrawal.WithdrawalMappingResult;
+import dev.vality.fistful.withdrawal.TimestampedChange;
+import dev.vality.fistful.withdrawal.Withdrawal;
+import org.springframework.stereotype.Component;
+
+@Component
+public class WithdrawalCreatedMapper extends AbstractWithdrawalMapper {
+
+ @Override
+ public WithdrawalMappingResult map(TimestampedChange change, WithdrawalEventContext context) {
+ Withdrawal withdrawal = change.getChange().getCreated().getWithdrawal();
+ WithdrawalStateSnapshot currentState = context.getCurrentState();
+ WithdrawalStateSnapshot.WithdrawalStateSnapshotBuilder builder = currentState != null
+ ? currentState.toBuilder()
+ : WithdrawalStateSnapshot.builder().withdrawalId(context.getWithdrawalId());
+
+ builder.withdrawalId(context.getWithdrawalId())
+ .partyId(withdrawal.getPartyId())
+ .walletId(withdrawal.getWalletId())
+ .destinationId(withdrawal.getDestinationId())
+ .currency(withdrawal.getBody() != null && withdrawal.getBody().getCurrency() != null
+ ? withdrawal.getBody().getCurrency().getSymbolicCode()
+ : null)
+ .requestedAmount(withdrawal.getBody() != null ? withdrawal.getBody().getAmount() : null)
+ .withdrawalCreatedAt(parseTime(withdrawal.getCreatedAt()))
+ .lastSequenceId(context.getMachineEvent().getEventId())
+ .updatedAt(context.getEventTime());
+
+ if (withdrawal.isSetRoute()) {
+ builder.providerId(extractProviderId(withdrawal.getRoute()))
+ .terminal(extractTerminal(withdrawal.getRoute()));
+ }
+
+ return WithdrawalMappingResult.builder()
+ .stateSnapshot(builder.build())
+ .build();
+ }
+
+ @Override
+ public EventType getChangeType() {
+ return EventType.WITHDRAWAL_CREATED;
+ }
+}
diff --git a/src/main/java/dev/vality/analytics/listener/mapper/withdrawal/WithdrawalMapper.java b/src/main/java/dev/vality/analytics/listener/mapper/withdrawal/WithdrawalMapper.java
new file mode 100644
index 00000000..83d56bc9
--- /dev/null
+++ b/src/main/java/dev/vality/analytics/listener/mapper/withdrawal/WithdrawalMapper.java
@@ -0,0 +1,9 @@
+package dev.vality.analytics.listener.mapper.withdrawal;
+
+import dev.vality.analytics.listener.handler.withdrawal.WithdrawalEventContext;
+import dev.vality.analytics.listener.handler.withdrawal.WithdrawalMappingResult;
+import dev.vality.analytics.listener.mapper.Mapper;
+import dev.vality.fistful.withdrawal.TimestampedChange;
+
+public interface WithdrawalMapper extends Mapper {
+}
diff --git a/src/main/java/dev/vality/analytics/listener/mapper/withdrawal/WithdrawalRouteMapper.java b/src/main/java/dev/vality/analytics/listener/mapper/withdrawal/WithdrawalRouteMapper.java
new file mode 100644
index 00000000..568d27b6
--- /dev/null
+++ b/src/main/java/dev/vality/analytics/listener/mapper/withdrawal/WithdrawalRouteMapper.java
@@ -0,0 +1,35 @@
+package dev.vality.analytics.listener.mapper.withdrawal;
+
+import dev.vality.analytics.constant.EventType;
+import dev.vality.analytics.dao.model.WithdrawalStateSnapshot;
+import dev.vality.analytics.listener.handler.withdrawal.WithdrawalEventContext;
+import dev.vality.analytics.listener.handler.withdrawal.WithdrawalMappingResult;
+import dev.vality.fistful.withdrawal.TimestampedChange;
+import org.springframework.stereotype.Component;
+
+@Component
+public class WithdrawalRouteMapper extends AbstractWithdrawalMapper {
+
+ @Override
+ public WithdrawalMappingResult map(TimestampedChange change, WithdrawalEventContext context) {
+ WithdrawalStateSnapshot currentState = context.getCurrentState();
+ if (currentState == null) {
+ return null;
+ }
+
+ dev.vality.fistful.withdrawal.Route route = change.getChange().getRoute().getRoute();
+ return WithdrawalMappingResult.builder()
+ .stateSnapshot(currentState.toBuilder()
+ .providerId(extractProviderId(route))
+ .terminal(extractTerminal(route))
+ .lastSequenceId(context.getMachineEvent().getEventId())
+ .updatedAt(context.getEventTime())
+ .build())
+ .build();
+ }
+
+ @Override
+ public EventType getChangeType() {
+ return EventType.WITHDRAWAL_ROUTE_CHANGED;
+ }
+}
diff --git a/src/main/java/dev/vality/analytics/listener/mapper/withdrawal/WithdrawalStatusMapper.java b/src/main/java/dev/vality/analytics/listener/mapper/withdrawal/WithdrawalStatusMapper.java
new file mode 100644
index 00000000..8b604d7e
--- /dev/null
+++ b/src/main/java/dev/vality/analytics/listener/mapper/withdrawal/WithdrawalStatusMapper.java
@@ -0,0 +1,63 @@
+package dev.vality.analytics.listener.mapper.withdrawal;
+
+import dev.vality.analytics.constant.EventType;
+import dev.vality.analytics.constant.WithdrawalStatus;
+import dev.vality.analytics.dao.model.WithdrawalRow;
+import dev.vality.analytics.dao.model.WithdrawalStateSnapshot;
+import dev.vality.analytics.listener.handler.withdrawal.WithdrawalEventContext;
+import dev.vality.analytics.listener.handler.withdrawal.WithdrawalMappingResult;
+import dev.vality.fistful.withdrawal.TimestampedChange;
+import org.springframework.stereotype.Component;
+
+import java.util.Optional;
+
+@Component
+public class WithdrawalStatusMapper extends AbstractWithdrawalMapper {
+
+ @Override
+ public WithdrawalMappingResult map(TimestampedChange change, WithdrawalEventContext context) {
+ WithdrawalStateSnapshot currentState = context.getCurrentState();
+ if (currentState == null) {
+ return null;
+ }
+
+ WithdrawalStatus status = mapStatus(change.getChange().getStatusChanged().getStatus());
+ if (status == null) {
+ return null;
+ }
+
+ WithdrawalStateSnapshot updatedState = currentState.toBuilder()
+ .lastSequenceId(context.getMachineEvent().getEventId())
+ .updatedAt(context.getEventTime())
+ .build();
+
+ WithdrawalRow row = WithdrawalRow.builder()
+ .eventTime(context.getEventTime())
+ .partyId(updatedState.getPartyId())
+ .withdrawalId(updatedState.getWithdrawalId())
+ .sequenceId(context.getMachineEvent().getEventId())
+ .withdrawalTime(Optional.ofNullable(updatedState.getWithdrawalCreatedAt())
+ .orElse(context.getEventTime()))
+ .walletId(updatedState.getWalletId())
+ .destinationId(updatedState.getDestinationId())
+ .providerId(updatedState.getProviderId())
+ .terminal(updatedState.getTerminal())
+ .amount(Optional.ofNullable(updatedState.getAmount())
+ .orElse(Optional.ofNullable(updatedState.getRequestedAmount()).orElse(0L)))
+ .systemFee(Optional.ofNullable(updatedState.getSystemFee()).orElse(0L))
+ .providerFee(Optional.ofNullable(updatedState.getProviderFee()).orElse(0L))
+ .currency(updatedState.getCurrency())
+ .status(status)
+ .build();
+
+ return WithdrawalMappingResult.builder()
+ .stateSnapshot(updatedState)
+ .withdrawalRow(row)
+ .build();
+ }
+
+ @Override
+ public EventType getChangeType() {
+ return EventType.WITHDRAWAL_STATUS_CHANGED;
+ }
+}
diff --git a/src/main/java/dev/vality/analytics/listener/mapper/withdrawal/WithdrawalTransferMapper.java b/src/main/java/dev/vality/analytics/listener/mapper/withdrawal/WithdrawalTransferMapper.java
new file mode 100644
index 00000000..caf3c9b1
--- /dev/null
+++ b/src/main/java/dev/vality/analytics/listener/mapper/withdrawal/WithdrawalTransferMapper.java
@@ -0,0 +1,54 @@
+package dev.vality.analytics.listener.mapper.withdrawal;
+
+import dev.vality.analytics.computer.WithdrawalCashFlowComputer;
+import dev.vality.analytics.constant.EventType;
+import dev.vality.analytics.dao.model.WithdrawalStateSnapshot;
+import dev.vality.analytics.domain.WithdrawalCashFlowResult;
+import dev.vality.analytics.listener.handler.withdrawal.WithdrawalEventContext;
+import dev.vality.analytics.listener.handler.withdrawal.WithdrawalMappingResult;
+import dev.vality.fistful.cashflow.FinalCashFlow;
+import dev.vality.fistful.withdrawal.TimestampedChange;
+import lombok.RequiredArgsConstructor;
+import org.springframework.stereotype.Component;
+
+@Component
+@RequiredArgsConstructor
+public class WithdrawalTransferMapper extends AbstractWithdrawalMapper {
+
+ private final WithdrawalCashFlowComputer withdrawalCashFlowComputer;
+
+ @Override
+ public boolean accept(TimestampedChange change) {
+ return getChangeType().getFilter().match(change)
+ && change.getChange().getTransfer().isSetPayload()
+ && change.getChange().getTransfer().getPayload().isSetCreated();
+ }
+
+ @Override
+ public WithdrawalMappingResult map(TimestampedChange change, WithdrawalEventContext context) {
+ WithdrawalStateSnapshot currentState = context.getCurrentState();
+ if (currentState == null) {
+ return null;
+ }
+
+ dev.vality.fistful.transfer.Change transferChange = change.getChange().getTransfer().getPayload();
+ FinalCashFlow cashFlow = transferChange.getCreated().getTransfer().getCashflow();
+ WithdrawalCashFlowResult cashFlowResult = withdrawalCashFlowComputer.compute(
+ cashFlow != null ? cashFlow.getPostings() : null);
+
+ return WithdrawalMappingResult.builder()
+ .stateSnapshot(currentState.toBuilder()
+ .amount(cashFlowResult.getAmount())
+ .systemFee(cashFlowResult.getSystemFee())
+ .providerFee(cashFlowResult.getProviderFee())
+ .lastSequenceId(context.getMachineEvent().getEventId())
+ .updatedAt(context.getEventTime())
+ .build())
+ .build();
+ }
+
+ @Override
+ public EventType getChangeType() {
+ return EventType.WITHDRAWAL_TRANSFER_CHANGED;
+ }
+}
diff --git a/src/main/java/dev/vality/analytics/utils/TimestampUtil.java b/src/main/java/dev/vality/analytics/utils/TimestampUtil.java
new file mode 100644
index 00000000..2d0e3673
--- /dev/null
+++ b/src/main/java/dev/vality/analytics/utils/TimestampUtil.java
@@ -0,0 +1,23 @@
+package dev.vality.analytics.utils;
+
+import dev.vality.geck.common.util.TypeUtil;
+import lombok.experimental.UtilityClass;
+
+import java.time.LocalDateTime;
+
+@UtilityClass
+public class TimestampUtil {
+
+ public static LocalDateTime parseLocalDateTime(String timestamp) {
+ if (timestamp == null) {
+ return null;
+ }
+
+ try {
+ return TypeUtil.stringToLocalDateTime(timestamp);
+ } catch (Exception e) {
+ return null;
+ }
+ }
+
+}
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
index 4f88a505..6c3bd33f 100644
--- a/src/main/resources/application.yml
+++ b/src/main/resources/application.yml
@@ -91,6 +91,9 @@ kafka:
dominant:
initial: mg-events-dominant
max.poll.records: 50
+ withdrawal:
+ initial: mg-events-withdrawal
+ max.poll.records: 50
rate:
initial: etl-exchange-rate
groupId: analytics-rate-group
@@ -107,6 +110,8 @@ kafka:
enabled: true
dominant:
enabled: true
+ withdrawal:
+ enabled: true
rate:
enabled: true
diff --git a/src/main/resources/db/migration-clickhouse/non-sharded/V3__recreate_shop_dictionary_with_composite_key.sql b/src/main/resources/db/migration-clickhouse/non-sharded/V3__recreate_shop_dictionary_with_composite_key.sql
new file mode 100644
index 00000000..c0a15f65
--- /dev/null
+++ b/src/main/resources/db/migration-clickhouse/non-sharded/V3__recreate_shop_dictionary_with_composite_key.sql
@@ -0,0 +1,43 @@
+DROP DICTIONARY IF EXISTS analytic.shop_dictionary;
+
+CREATE DICTIONARY IF NOT EXISTS analytic.shop_dictionary (
+ party_id String,
+ shop_id String,
+ id UInt64,
+ event_id UInt64,
+ event_time DateTime,
+ category_id Int32,
+ created_at DateTime,
+ blocking String,
+ blocked_reason String,
+ blocked_since DateTime,
+ unblocked_reason String,
+ unblocked_since DateTime,
+ suspension String,
+ suspension_active_since DateTime,
+ suspension_suspended_since DateTime,
+ details_name String,
+ details_description String,
+ location_url String,
+ account_currency_code String,
+ account_settlement String,
+ account_guarantee String,
+ international_legal_entity_country_code String,
+ version_id UInt64,
+ changed_by_id String,
+ changed_by_email String,
+ changed_by_name String,
+ deleted Bool
+)
+PRIMARY KEY party_id, shop_id
+SOURCE(POSTGRESQL(
+ HOST '<>'
+ PORT <>
+ USER '<>'
+ PASSWORD '<>'
+ DB '<>'
+ SCHEMA '<>'
+ TABLE 'shop'
+ ))
+LAYOUT(COMPLEX_KEY_HASHED())
+LIFETIME(MIN 300 MAX 360);
diff --git a/src/main/resources/db/migration-clickhouse/non-sharded/V4__create_withdrawal_table.sql b/src/main/resources/db/migration-clickhouse/non-sharded/V4__create_withdrawal_table.sql
new file mode 100644
index 00000000..ba1da497
--- /dev/null
+++ b/src/main/resources/db/migration-clickhouse/non-sharded/V4__create_withdrawal_table.sql
@@ -0,0 +1,20 @@
+CREATE TABLE IF NOT EXISTS analytic.events_sink_withdrawal (
+ timestamp Date,
+ eventTime UInt64,
+ eventTimeHour UInt64,
+ partyId String,
+ withdrawalId String,
+ sequenceId UInt64,
+ withdrawalTime UInt64,
+ walletId String,
+ destinationId String,
+ providerId String,
+ terminal String,
+ amount UInt64,
+ systemFee UInt64,
+ providerFee UInt64,
+ currency String,
+ status Enum8('pending' = 1, 'succeeded' = 2, 'failed' = 3)
+) ENGINE = ReplacingMergeTree()
+PARTITION BY toYYYYMM(timestamp)
+ORDER BY (eventTimeHour, partyId, walletId, status, currency, providerId, terminal, withdrawalId, sequenceId);
diff --git a/src/main/resources/db/migration-clickhouse/sharded/V3__recreate_shop_dictionary_with_composite_key.sql b/src/main/resources/db/migration-clickhouse/sharded/V3__recreate_shop_dictionary_with_composite_key.sql
new file mode 100644
index 00000000..c0a15f65
--- /dev/null
+++ b/src/main/resources/db/migration-clickhouse/sharded/V3__recreate_shop_dictionary_with_composite_key.sql
@@ -0,0 +1,43 @@
+DROP DICTIONARY IF EXISTS analytic.shop_dictionary;
+
+CREATE DICTIONARY IF NOT EXISTS analytic.shop_dictionary (
+ party_id String,
+ shop_id String,
+ id UInt64,
+ event_id UInt64,
+ event_time DateTime,
+ category_id Int32,
+ created_at DateTime,
+ blocking String,
+ blocked_reason String,
+ blocked_since DateTime,
+ unblocked_reason String,
+ unblocked_since DateTime,
+ suspension String,
+ suspension_active_since DateTime,
+ suspension_suspended_since DateTime,
+ details_name String,
+ details_description String,
+ location_url String,
+ account_currency_code String,
+ account_settlement String,
+ account_guarantee String,
+ international_legal_entity_country_code String,
+ version_id UInt64,
+ changed_by_id String,
+ changed_by_email String,
+ changed_by_name String,
+ deleted Bool
+)
+PRIMARY KEY party_id, shop_id
+SOURCE(POSTGRESQL(
+ HOST '<>'
+ PORT <>
+ USER '<>'
+ PASSWORD '<>'
+ DB '<>'
+ SCHEMA '<>'
+ TABLE 'shop'
+ ))
+LAYOUT(COMPLEX_KEY_HASHED())
+LIFETIME(MIN 300 MAX 360);
diff --git a/src/main/resources/db/migration-clickhouse/sharded/V4__create_withdrawal_table.sql b/src/main/resources/db/migration-clickhouse/sharded/V4__create_withdrawal_table.sql
new file mode 100644
index 00000000..c89cf4b7
--- /dev/null
+++ b/src/main/resources/db/migration-clickhouse/sharded/V4__create_withdrawal_table.sql
@@ -0,0 +1,23 @@
+CREATE TABLE IF NOT EXISTS analytic.events_sink_withdrawal_local (
+ timestamp Date,
+ eventTime UInt64,
+ eventTimeHour UInt64,
+ partyId String,
+ withdrawalId String,
+ sequenceId UInt64,
+ withdrawalTime UInt64,
+ walletId String,
+ destinationId String,
+ providerId String,
+ terminal String,
+ amount UInt64,
+ systemFee UInt64,
+ providerFee UInt64,
+ currency String,
+ status Enum8('pending' = 1, 'succeeded' = 2, 'failed' = 3)
+) ENGINE = ReplicatedReplacingMergeTree('/clickhouse/<>/tables/<>/{database}/{table}', '<>')
+PARTITION BY toYYYYMM(timestamp)
+ORDER BY (eventTimeHour, partyId, walletId, status, currency, providerId, terminal, withdrawalId, sequenceId);
+
+CREATE TABLE IF NOT EXISTS analytic.events_sink_withdrawal AS analytic.events_sink_withdrawal_local
+ENGINE = Distributed('<>', analytic, events_sink_withdrawal_local, cityHash64(timestamp, partyId));
diff --git a/src/main/resources/db/migration/V21__add_withdrawal_state.sql b/src/main/resources/db/migration/V21__add_withdrawal_state.sql
new file mode 100644
index 00000000..43055ee1
--- /dev/null
+++ b/src/main/resources/db/migration/V21__add_withdrawal_state.sql
@@ -0,0 +1,17 @@
+CREATE TABLE analytics.withdrawal_state (
+ withdrawal_id VARCHAR NOT NULL,
+ party_id VARCHAR NOT NULL,
+ wallet_id VARCHAR,
+ destination_id VARCHAR,
+ currency VARCHAR,
+ requested_amount BIGINT,
+ amount BIGINT,
+ system_fee BIGINT,
+ provider_fee BIGINT,
+ withdrawal_created_at TIMESTAMP WITHOUT TIME ZONE,
+ provider_id VARCHAR,
+ terminal VARCHAR,
+ last_sequence_id BIGINT NOT NULL,
+ updated_at TIMESTAMP WITHOUT TIME ZONE NOT NULL,
+ CONSTRAINT withdrawal_state_pkey PRIMARY KEY (withdrawal_id)
+);
diff --git a/src/test/java/dev/vality/analytics/computer/WithdrawalCashFlowComputerTest.java b/src/test/java/dev/vality/analytics/computer/WithdrawalCashFlowComputerTest.java
new file mode 100644
index 00000000..6886dc91
--- /dev/null
+++ b/src/test/java/dev/vality/analytics/computer/WithdrawalCashFlowComputerTest.java
@@ -0,0 +1,59 @@
+package dev.vality.analytics.computer;
+
+import dev.vality.analytics.domain.WithdrawalCashFlowResult;
+import dev.vality.analytics.utils.WithdrawalEventTestUtils;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+public class WithdrawalCashFlowComputerTest {
+
+ private WithdrawalCashFlowComputer withdrawalCashFlowComputer;
+
+ @BeforeEach
+ public void setUp() {
+ withdrawalCashFlowComputer = new WithdrawalCashFlowComputer();
+ }
+
+ @Test
+ public void shouldMapAllSupportedPostingTypes() {
+ WithdrawalCashFlowResult result = withdrawalCashFlowComputer.compute(List.of(
+ WithdrawalEventTestUtils.walletSenderSettlementToReceiverDestination(1000L),
+ WithdrawalEventTestUtils.walletSenderSettlementToSystem(100L),
+ WithdrawalEventTestUtils.systemToProvider(20L),
+ WithdrawalEventTestUtils.systemToExternal(10L)));
+
+ assertThat(result.getAmount(), is(1000L));
+ assertThat(result.getSystemFee(), is(100L));
+ assertThat(result.getProviderFee(), is(20L));
+ }
+
+ @Test
+ public void shouldIgnoreUnsupportedPostings() {
+ WithdrawalCashFlowResult result = withdrawalCashFlowComputer.compute(List.of(
+ WithdrawalEventTestUtils.unrelatedPosting(999L)));
+
+ assertThat(result.getAmount(), is(0L));
+ assertThat(result.getSystemFee(), is(0L));
+ assertThat(result.getProviderFee(), is(0L));
+ }
+
+ @Test
+ public void shouldUseLastPostingPerTypeLikeFistfulAssociateBy() {
+ WithdrawalCashFlowResult result = withdrawalCashFlowComputer.compute(List.of(
+ WithdrawalEventTestUtils.walletSenderSettlementToReceiverDestination(1000L),
+ WithdrawalEventTestUtils.walletSenderSettlementToReceiverDestination(900L),
+ WithdrawalEventTestUtils.walletSenderSettlementToSystem(100L),
+ WithdrawalEventTestUtils.walletSenderSettlementToSystem(90L),
+ WithdrawalEventTestUtils.systemToProvider(20L),
+ WithdrawalEventTestUtils.systemToProvider(15L)));
+
+ assertThat(result.getAmount(), is(900L));
+ assertThat(result.getSystemFee(), is(90L));
+ assertThat(result.getProviderFee(), is(15L));
+ }
+}
diff --git a/src/test/java/dev/vality/analytics/config/KafkaTest.java b/src/test/java/dev/vality/analytics/config/KafkaTest.java
index fb399da9..d311667b 100644
--- a/src/test/java/dev/vality/analytics/config/KafkaTest.java
+++ b/src/test/java/dev/vality/analytics/config/KafkaTest.java
@@ -17,7 +17,8 @@
"kafka.topic.event.sink.initial",
"kafka.topic.party.initial",
"kafka.topic.rate.initial",
- "kafka.topic.dominant.initial"
+ "kafka.topic.dominant.initial",
+ "kafka.topic.withdrawal.initial"
})
@KafkaTestConfig
public @interface KafkaTest {
diff --git a/src/test/java/dev/vality/analytics/listener/WithdrawalListenerTest.java b/src/test/java/dev/vality/analytics/listener/WithdrawalListenerTest.java
new file mode 100644
index 00000000..e8c7fe34
--- /dev/null
+++ b/src/test/java/dev/vality/analytics/listener/WithdrawalListenerTest.java
@@ -0,0 +1,91 @@
+package dev.vality.analytics.listener;
+
+import dev.vality.analytics.config.SpringBootITest;
+import dev.vality.analytics.utils.WithdrawalEventTestUtils;
+import dev.vality.testcontainers.annotations.kafka.config.KafkaProducer;
+import org.apache.thrift.TBase;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.jdbc.core.JdbcTemplate;
+
+import java.time.LocalDateTime;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@SpringBootITest
+public class WithdrawalListenerTest {
+
+ @Value("${kafka.topic.withdrawal.initial}")
+ private String withdrawalTopic;
+
+ @Autowired
+ private JdbcTemplate postgresJdbcTemplate;
+ @Autowired
+ private JdbcTemplate clickHouseJdbcTemplate;
+ @Autowired
+ private KafkaProducer> testThriftKafkaProducer;
+
+ @Test
+ public void shouldReduceWithdrawalStateAndWriteClickHouseSnapshot() {
+ seedShopDictionary();
+
+ List> flow = WithdrawalEventTestUtils.fullSuccessFlow();
+ flow.forEach(event -> testThriftKafkaProducer.send(withdrawalTopic, event));
+
+ Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> {
+ Integer count = clickHouseJdbcTemplate.queryForObject(
+ "SELECT count(*) FROM analytic.events_sink_withdrawal WHERE withdrawalId = ?",
+ Integer.class,
+ WithdrawalEventTestUtils.WITHDRAWAL_ID);
+ return count != null && count == 1;
+ });
+
+ Map stateRow = postgresJdbcTemplate.queryForMap(
+ "SELECT * FROM analytics.withdrawal_state WHERE withdrawal_id = ?",
+ WithdrawalEventTestUtils.WITHDRAWAL_ID);
+ assertEquals(WithdrawalEventTestUtils.PARTY_ID, stateRow.get("party_id"));
+ assertEquals(WithdrawalEventTestUtils.CURRENCY, stateRow.get("currency"));
+ assertEquals(1200L, ((Number) stateRow.get("amount")).longValue());
+ assertEquals(100L, ((Number) stateRow.get("system_fee")).longValue());
+ assertEquals(20L, ((Number) stateRow.get("provider_fee")).longValue());
+ assertEquals("42", stateRow.get("provider_id"));
+ assertEquals("24", stateRow.get("terminal"));
+
+ Map withdrawalRow = clickHouseJdbcTemplate.queryForMap(
+ "SELECT partyId, currency, providerId, terminal, amount, systemFee, providerFee, status " +
+ "FROM analytic.events_sink_withdrawal WHERE withdrawalId = ? AND status = 'succeeded'",
+ WithdrawalEventTestUtils.WITHDRAWAL_ID);
+ assertEquals(WithdrawalEventTestUtils.PARTY_ID, withdrawalRow.get("partyId"));
+ assertEquals(WithdrawalEventTestUtils.CURRENCY, withdrawalRow.get("currency"));
+ assertEquals("42", withdrawalRow.get("providerId"));
+ assertEquals("24", withdrawalRow.get("terminal"));
+ assertEquals(1200L, ((Number) withdrawalRow.get("amount")).longValue());
+ assertEquals(100L, ((Number) withdrawalRow.get("systemFee")).longValue());
+ assertEquals(20L, ((Number) withdrawalRow.get("providerFee")).longValue());
+
+ clickHouseJdbcTemplate.execute("SYSTEM RELOAD DICTIONARY analytic.shop_dictionary");
+ String locationUrl = clickHouseJdbcTemplate.queryForObject(
+ "SELECT dictGet('analytic.shop_dictionary', 'location_url', tuple(?, ?))",
+ String.class,
+ WithdrawalEventTestUtils.PARTY_ID,
+ "shop-dict-1");
+ assertEquals("https://merchant.example/shop-dict-1", locationUrl);
+ }
+
+ private void seedShopDictionary() {
+ postgresJdbcTemplate.update(
+ "INSERT INTO analytics.shop " +
+ "(event_id, event_time, party_id, shop_id, location_url) " +
+ "VALUES (?, ?, ?, ?, ?)",
+ 1L,
+ LocalDateTime.of(2024, 1, 10, 10, 0),
+ WithdrawalEventTestUtils.PARTY_ID,
+ "shop-dict-1",
+ "https://merchant.example/shop-dict-1");
+ }
+}
diff --git a/src/test/java/dev/vality/analytics/listener/handler/withdrawal/WithdrawalEventHandlerTest.java b/src/test/java/dev/vality/analytics/listener/handler/withdrawal/WithdrawalEventHandlerTest.java
new file mode 100644
index 00000000..cf90eff5
--- /dev/null
+++ b/src/test/java/dev/vality/analytics/listener/handler/withdrawal/WithdrawalEventHandlerTest.java
@@ -0,0 +1,201 @@
+package dev.vality.analytics.listener.handler.withdrawal;
+
+import dev.vality.analytics.computer.WithdrawalCashFlowComputer;
+import dev.vality.analytics.dao.model.WithdrawalRow;
+import dev.vality.analytics.dao.model.WithdrawalStateSnapshot;
+import dev.vality.analytics.dao.repository.clickhouse.ClickHouseWithdrawalRepository;
+import dev.vality.analytics.dao.repository.postgres.PostgresWithdrawalStateRepository;
+import dev.vality.analytics.listener.mapper.withdrawal.WithdrawalCreatedMapper;
+import dev.vality.analytics.listener.mapper.withdrawal.WithdrawalRouteMapper;
+import dev.vality.analytics.listener.mapper.withdrawal.WithdrawalStatusMapper;
+import dev.vality.analytics.listener.mapper.withdrawal.WithdrawalTransferMapper;
+import dev.vality.analytics.utils.WithdrawalEventTestUtils;
+import dev.vality.fistful.withdrawal.TimestampedChange;
+import dev.vality.machinegun.eventsink.MachineEvent;
+import dev.vality.sink.common.parser.impl.MachineEventParser;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+public class WithdrawalEventHandlerTest {
+
+ @Mock
+ private MachineEventParser machineEventParser;
+ @Mock
+ private PostgresWithdrawalStateRepository postgresWithdrawalStateRepository;
+ @Mock
+ private ClickHouseWithdrawalRepository clickHouseWithdrawalRepository;
+
+ private WithdrawalEventHandler withdrawalEventHandler;
+ private Map store;
+
+ @BeforeEach
+ public void setUp() {
+ withdrawalEventHandler = new WithdrawalEventHandler(
+ machineEventParser,
+ postgresWithdrawalStateRepository,
+ clickHouseWithdrawalRepository,
+ List.of(
+ new WithdrawalCreatedMapper(),
+ new WithdrawalRouteMapper(),
+ new WithdrawalTransferMapper(new WithdrawalCashFlowComputer()),
+ new WithdrawalStatusMapper()));
+ store = new HashMap<>();
+
+ when(postgresWithdrawalStateRepository.findByWithdrawalId(anyString()))
+ .thenAnswer(invocation -> Optional.ofNullable(store.get(invocation.getArgument(0))));
+ doAnswer(invocation -> {
+ WithdrawalStateSnapshot snapshot = invocation.getArgument(0);
+ store.put(snapshot.getWithdrawalId(), snapshot);
+ return null;
+ }).when(postgresWithdrawalStateRepository).upsert(any(WithdrawalStateSnapshot.class));
+ }
+
+ @Test
+ public void createdShouldInitializeReducerState() {
+ MachineEvent machineEvent = WithdrawalEventTestUtils.machineEvent(1L,
+ WithdrawalEventTestUtils.createdChange(1500L, 42, 24));
+ when(machineEventParser.parse(machineEvent))
+ .thenReturn(WithdrawalEventTestUtils.createdChange(1500L, 42, 24));
+
+ withdrawalEventHandler.handle(List.of(machineEvent));
+
+ WithdrawalStateSnapshot snapshot = store.get(WithdrawalEventTestUtils.WITHDRAWAL_ID);
+ assertEquals(WithdrawalEventTestUtils.PARTY_ID, snapshot.getPartyId());
+ assertEquals(WithdrawalEventTestUtils.WALLET_ID, snapshot.getWalletId());
+ assertEquals(WithdrawalEventTestUtils.DESTINATION_ID, snapshot.getDestinationId());
+ assertEquals(WithdrawalEventTestUtils.CURRENCY, snapshot.getCurrency());
+ assertEquals(1500L, snapshot.getRequestedAmount());
+ assertEquals("42", snapshot.getProviderId());
+ assertEquals("24", snapshot.getTerminal());
+ }
+
+ @Test
+ public void routeShouldUpdateProviderAndTerminal() {
+ store.put(WithdrawalEventTestUtils.WITHDRAWAL_ID, WithdrawalStateSnapshot.builder()
+ .withdrawalId(WithdrawalEventTestUtils.WITHDRAWAL_ID)
+ .partyId(WithdrawalEventTestUtils.PARTY_ID)
+ .lastSequenceId(1L)
+ .build());
+
+ MachineEvent machineEvent = WithdrawalEventTestUtils.machineEvent(
+ 2L,
+ WithdrawalEventTestUtils.routeChange(77, 55));
+ when(machineEventParser.parse(machineEvent)).thenReturn(WithdrawalEventTestUtils.routeChange(77, 55));
+
+ withdrawalEventHandler.handle(List.of(machineEvent));
+
+ WithdrawalStateSnapshot snapshot = store.get(WithdrawalEventTestUtils.WITHDRAWAL_ID);
+ assertEquals("77", snapshot.getProviderId());
+ assertEquals("55", snapshot.getTerminal());
+ assertEquals(2L, snapshot.getLastSequenceId());
+ }
+
+ @Test
+ public void transferCreatedShouldUpdateMonetaryFields() {
+ store.put(WithdrawalEventTestUtils.WITHDRAWAL_ID, WithdrawalStateSnapshot.builder()
+ .withdrawalId(WithdrawalEventTestUtils.WITHDRAWAL_ID)
+ .partyId(WithdrawalEventTestUtils.PARTY_ID)
+ .lastSequenceId(1L)
+ .build());
+
+ MachineEvent machineEvent = WithdrawalEventTestUtils.machineEvent(3L,
+ WithdrawalEventTestUtils.transferCreatedChange(1200L, 100L, 20L));
+ when(machineEventParser.parse(machineEvent))
+ .thenReturn(WithdrawalEventTestUtils.transferCreatedChange(1200L, 100L, 20L));
+
+ withdrawalEventHandler.handle(List.of(machineEvent));
+
+ WithdrawalStateSnapshot snapshot = store.get(WithdrawalEventTestUtils.WITHDRAWAL_ID);
+ assertEquals(1200L, snapshot.getAmount());
+ assertEquals(100L, snapshot.getSystemFee());
+ assertEquals(20L, snapshot.getProviderFee());
+ }
+
+ @Test
+ public void statusChangedShouldWriteSnapshotRowUsingCurrentState() {
+ store.put(WithdrawalEventTestUtils.WITHDRAWAL_ID, WithdrawalStateSnapshot.builder()
+ .withdrawalId(WithdrawalEventTestUtils.WITHDRAWAL_ID)
+ .partyId(WithdrawalEventTestUtils.PARTY_ID)
+ .walletId(WithdrawalEventTestUtils.WALLET_ID)
+ .destinationId(WithdrawalEventTestUtils.DESTINATION_ID)
+ .currency(WithdrawalEventTestUtils.CURRENCY)
+ .withdrawalCreatedAt(java.time.LocalDateTime.parse("2024-01-10T10:15:30"))
+ .providerId("42")
+ .terminal("24")
+ .amount(1200L)
+ .systemFee(100L)
+ .providerFee(20L)
+ .lastSequenceId(3L)
+ .build());
+
+ MachineEvent machineEvent = WithdrawalEventTestUtils.machineEvent(
+ 4L,
+ WithdrawalEventTestUtils.succeededStatusChange());
+ when(machineEventParser.parse(machineEvent)).thenReturn(WithdrawalEventTestUtils.succeededStatusChange());
+
+ withdrawalEventHandler.handle(List.of(machineEvent));
+
+ ArgumentCaptor> rowsCaptor = ArgumentCaptor.forClass(List.class);
+ verify(clickHouseWithdrawalRepository).insertBatch(rowsCaptor.capture());
+ List rows = rowsCaptor.getValue();
+ assertThat(rows, hasSize(1));
+ WithdrawalRow row = rows.get(0);
+ assertThat(row.getPartyId(), is(WithdrawalEventTestUtils.PARTY_ID));
+ assertThat(row.getCurrency(), is(WithdrawalEventTestUtils.CURRENCY));
+ assertThat(row.getProviderId(), is("42"));
+ assertThat(row.getTerminal(), is("24"));
+ assertThat(row.getAmount(), is(1200L));
+ assertThat(row.getSystemFee(), is(100L));
+ assertThat(row.getProviderFee(), is(20L));
+ assertThat(row.getStatus().name(), is("succeeded"));
+ }
+
+ @Test
+ public void statusChangedShouldFallbackToRequestedAmountWhenTransferMissing() {
+ store.put(WithdrawalEventTestUtils.WITHDRAWAL_ID, WithdrawalStateSnapshot.builder()
+ .withdrawalId(WithdrawalEventTestUtils.WITHDRAWAL_ID)
+ .partyId(WithdrawalEventTestUtils.PARTY_ID)
+ .walletId(WithdrawalEventTestUtils.WALLET_ID)
+ .destinationId(WithdrawalEventTestUtils.DESTINATION_ID)
+ .currency(WithdrawalEventTestUtils.CURRENCY)
+ .withdrawalCreatedAt(java.time.LocalDateTime.parse("2024-01-10T10:15:30"))
+ .requestedAmount(1500L)
+ .lastSequenceId(1L)
+ .build());
+
+ MachineEvent machineEvent = WithdrawalEventTestUtils.machineEvent(
+ 2L,
+ WithdrawalEventTestUtils.pendingStatusChange());
+ when(machineEventParser.parse(machineEvent)).thenReturn(WithdrawalEventTestUtils.pendingStatusChange());
+
+ withdrawalEventHandler.handle(List.of(machineEvent));
+
+ ArgumentCaptor> rowsCaptor = ArgumentCaptor.forClass(List.class);
+ verify(clickHouseWithdrawalRepository).insertBatch(rowsCaptor.capture());
+ WithdrawalRow row = rowsCaptor.getValue().get(0);
+ assertThat(row.getAmount(), is(1500L));
+ assertThat(row.getSystemFee(), is(0L));
+ assertThat(row.getProviderFee(), is(0L));
+ assertThat(row.getStatus().name(), is("pending"));
+ }
+}
diff --git a/src/test/java/dev/vality/analytics/utils/WithdrawalEventTestUtils.java b/src/test/java/dev/vality/analytics/utils/WithdrawalEventTestUtils.java
new file mode 100644
index 00000000..76eaaadb
--- /dev/null
+++ b/src/test/java/dev/vality/analytics/utils/WithdrawalEventTestUtils.java
@@ -0,0 +1,176 @@
+package dev.vality.analytics.utils;
+
+import dev.vality.fistful.base.Cash;
+import dev.vality.fistful.base.CurrencyRef;
+import dev.vality.fistful.cashflow.CashFlowAccount;
+import dev.vality.fistful.cashflow.ExternalCashFlowAccount;
+import dev.vality.fistful.cashflow.FinalCashFlow;
+import dev.vality.fistful.cashflow.FinalCashFlowAccount;
+import dev.vality.fistful.cashflow.FinalCashFlowPosting;
+import dev.vality.fistful.cashflow.ProviderCashFlowAccount;
+import dev.vality.fistful.cashflow.SystemCashFlowAccount;
+import dev.vality.fistful.cashflow.WalletCashFlowAccount;
+import dev.vality.fistful.transfer.Transfer;
+import dev.vality.fistful.withdrawal.Change;
+import dev.vality.fistful.withdrawal.CreatedChange;
+import dev.vality.fistful.withdrawal.Route;
+import dev.vality.fistful.withdrawal.RouteChange;
+import dev.vality.fistful.withdrawal.StatusChange;
+import dev.vality.fistful.withdrawal.TimestampedChange;
+import dev.vality.fistful.withdrawal.TransferChange;
+import dev.vality.fistful.withdrawal.Withdrawal;
+import dev.vality.fistful.withdrawal.status.Pending;
+import dev.vality.fistful.withdrawal.status.Status;
+import dev.vality.fistful.withdrawal.status.Succeeded;
+import dev.vality.geck.common.util.TypeUtil;
+import dev.vality.geck.serializer.Geck;
+import dev.vality.machinegun.eventsink.MachineEvent;
+import dev.vality.machinegun.eventsink.SinkEvent;
+import org.apache.thrift.TBase;
+
+import java.time.Instant;
+import java.util.List;
+
+public final class WithdrawalEventTestUtils {
+
+ public static final String WITHDRAWAL_ID = "withdrawal-1";
+ public static final String PARTY_ID = "party-1";
+ public static final String WALLET_ID = "wallet-1";
+ public static final String DESTINATION_ID = "destination-1";
+ public static final String CURRENCY = "RUB";
+ public static final String CREATED_AT = "2024-01-10T10:15:30Z";
+ public static final String OCCURRED_AT = "2024-01-10T10:15:35Z";
+
+ private WithdrawalEventTestUtils() {
+ }
+
+ public static TimestampedChange createdChange(long requestedAmount, Integer providerId, Integer terminalId) {
+ Withdrawal withdrawal = new Withdrawal()
+ .setId(WITHDRAWAL_ID)
+ .setPartyId(PARTY_ID)
+ .setWalletId(WALLET_ID)
+ .setDestinationId(DESTINATION_ID)
+ .setBody(new Cash().setAmount(requestedAmount).setCurrency(new CurrencyRef(CURRENCY)))
+ .setCreatedAt(CREATED_AT)
+ .setDomainRevision(1L);
+
+ if (providerId != null) {
+ Route route = new Route().setProviderId(providerId);
+ if (terminalId != null) {
+ route.setTerminalId(terminalId);
+ }
+ withdrawal.setRoute(route);
+ }
+
+ return new TimestampedChange()
+ .setOccuredAt(OCCURRED_AT)
+ .setChange(Change.created(new CreatedChange().setWithdrawal(withdrawal)));
+ }
+
+ public static TimestampedChange routeChange(int providerId, Integer terminalId) {
+ Route route = new Route().setProviderId(providerId);
+ if (terminalId != null) {
+ route.setTerminalId(terminalId);
+ }
+
+ return new TimestampedChange()
+ .setOccuredAt(OCCURRED_AT)
+ .setChange(Change.route(new RouteChange().setRoute(route)));
+ }
+
+ public static TimestampedChange transferCreatedChange(
+ long amount,
+ long systemFee,
+ long providerFee) {
+ FinalCashFlow cashFlow = new FinalCashFlow().setPostings(List.of(
+ walletSenderSettlementToReceiverDestination(amount),
+ walletSenderSettlementToSystem(systemFee),
+ systemToProvider(providerFee),
+ unrelatedPosting(999L)));
+
+ Transfer transfer = new Transfer()
+ .setId("transfer-1")
+ .setCashflow(cashFlow);
+
+ return new TimestampedChange()
+ .setOccuredAt(OCCURRED_AT)
+ .setChange(Change.transfer(new TransferChange().setPayload(
+ dev.vality.fistful.transfer.Change.created(
+ new dev.vality.fistful.transfer.CreatedChange().setTransfer(transfer)))));
+ }
+
+ public static TimestampedChange pendingStatusChange() {
+ return new TimestampedChange()
+ .setOccuredAt(OCCURRED_AT)
+ .setChange(Change.status_changed(new StatusChange().setStatus(Status.pending(new Pending()))));
+ }
+
+ public static TimestampedChange succeededStatusChange() {
+ return new TimestampedChange()
+ .setOccuredAt(OCCURRED_AT)
+ .setChange(Change.status_changed(new StatusChange().setStatus(Status.succeeded(new Succeeded()))));
+ }
+
+ public static MachineEvent machineEvent(long eventId, TimestampedChange timestampedChange) {
+ return new MachineEvent()
+ .setSourceNs("withdrawal")
+ .setSourceId(WITHDRAWAL_ID)
+ .setEventId(eventId)
+ .setCreatedAt(TypeUtil.temporalToString(Instant.parse(OCCURRED_AT)))
+ .setData(dev.vality.machinegun.msgpack.Value.bin(Geck.toMsgPack(timestampedChange)));
+ }
+
+ public static SinkEvent sinkEvent(long eventId, TimestampedChange timestampedChange) {
+ return SinkEvent.event(machineEvent(eventId, timestampedChange));
+ }
+
+ public static List> fullSuccessFlow() {
+ return List.of(
+ sinkEvent(1L, createdChange(1500L, null, null)),
+ sinkEvent(2L, routeChange(42, 24)),
+ sinkEvent(3L, transferCreatedChange(1200L, 100L, 20L)),
+ sinkEvent(4L, succeededStatusChange()));
+ }
+
+ public static FinalCashFlowPosting walletSenderSettlementToReceiverDestination(long amount) {
+ return posting(
+ CashFlowAccount.wallet(WalletCashFlowAccount.sender_settlement),
+ CashFlowAccount.wallet(WalletCashFlowAccount.receiver_destination),
+ amount);
+ }
+
+ public static FinalCashFlowPosting walletSenderSettlementToSystem(long amount) {
+ return posting(
+ CashFlowAccount.wallet(WalletCashFlowAccount.sender_settlement),
+ CashFlowAccount.system(SystemCashFlowAccount.settlement),
+ amount);
+ }
+
+ public static FinalCashFlowPosting systemToProvider(long amount) {
+ return posting(
+ CashFlowAccount.system(SystemCashFlowAccount.settlement),
+ CashFlowAccount.provider(ProviderCashFlowAccount.settlement),
+ amount);
+ }
+
+ public static FinalCashFlowPosting systemToExternal(long amount) {
+ return posting(
+ CashFlowAccount.system(SystemCashFlowAccount.settlement),
+ CashFlowAccount.external(ExternalCashFlowAccount.outcome),
+ amount);
+ }
+
+ public static FinalCashFlowPosting unrelatedPosting(long amount) {
+ return posting(
+ CashFlowAccount.provider(ProviderCashFlowAccount.settlement),
+ CashFlowAccount.system(SystemCashFlowAccount.settlement),
+ amount);
+ }
+
+ private static FinalCashFlowPosting posting(CashFlowAccount source, CashFlowAccount destination, long amount) {
+ return new FinalCashFlowPosting()
+ .setSource(new FinalCashFlowAccount().setAccountType(source))
+ .setDestination(new FinalCashFlowAccount().setAccountType(destination))
+ .setVolume(new Cash().setAmount(amount).setCurrency(new CurrencyRef(CURRENCY)));
+ }
+}
diff --git a/src/test/resources/application.properties b/src/test/resources/application.properties
index 9bfd0ae6..3374f194 100644
--- a/src/test/resources/application.properties
+++ b/src/test/resources/application.properties
@@ -1 +1,2 @@
clickhouse.flyway.enabled=false
+kafka.topic.withdrawal.initial=mg-events-withdrawal