From 069aa8a005d7ca7b6b7e337a5e860e5e3e4a0262 Mon Sep 17 00:00:00 2001 From: vitaxa Date: Wed, 4 Mar 2026 15:21:58 +0300 Subject: [PATCH 1/4] add withdrawal listener --- pom.xml | 5 + .../computer/WithdrawalCashFlowComputer.java | 86 ++++++++ .../vality/analytics/config/KafkaConfig.java | 13 ++ .../analytics/config/SerializeConfig.java | 17 ++ .../vality/analytics/constant/EventType.java | 5 +- .../analytics/constant/WithdrawalStatus.java | 7 + .../model/AcceptanceDisbursementRawRow.java | 22 ++ .../analytics/dao/model/WithdrawalRow.java | 34 +++ .../dao/model/WithdrawalStateSnapshot.java | 32 +++ ...ithdrawalBatchPreparedStatementSetter.java | 62 ++++++ .../ClickHouseWithdrawalRepository.java | 36 ++++ .../PostgresWithdrawalStateRepository.java | 96 +++++++++ .../listener/WithdrawalListener.java | 54 +++++ .../withdrawal/WithdrawalEventContext.java | 21 ++ .../withdrawal/WithdrawalEventHandler.java | 185 ++++++++++++++++ .../withdrawal/WithdrawalMappingResult.java | 15 ++ .../withdrawal/AbstractWithdrawalMapper.java | 79 +++++++ .../withdrawal/WithdrawalCreatedMapper.java | 48 +++++ .../mapper/withdrawal/WithdrawalMapper.java | 9 + .../withdrawal/WithdrawalRouteMapper.java | 35 +++ .../withdrawal/WithdrawalStatusMapper.java | 65 ++++++ .../withdrawal/WithdrawalTransferMapper.java | 55 +++++ src/main/resources/application.yml | 5 + ...ate_shop_dictionary_with_composite_key.sql | 43 ++++ .../V4__create_withdrawal_table.sql | 22 ++ ...ate_shop_dictionary_with_composite_key.sql | 43 ++++ .../sharded/V4__create_withdrawal_table.sql | 25 +++ .../migration/V21__add_withdrawal_state.sql | 18 ++ .../WithdrawalCashFlowComputerTest.java | 48 +++++ .../vality/analytics/config/KafkaTest.java | 3 +- .../listener/WithdrawalListenerTest.java | 93 ++++++++ .../WithdrawalEventHandlerTest.java | 199 ++++++++++++++++++ .../utils/WithdrawalEventTestUtils.java | 174 +++++++++++++++ src/test/resources/application.properties | 1 + 34 files changed, 1653 insertions(+), 2 deletions(-) create mode 100644 src/main/java/dev/vality/analytics/computer/WithdrawalCashFlowComputer.java create mode 100644 src/main/java/dev/vality/analytics/constant/WithdrawalStatus.java create mode 100644 src/main/java/dev/vality/analytics/dao/model/AcceptanceDisbursementRawRow.java create mode 100644 src/main/java/dev/vality/analytics/dao/model/WithdrawalRow.java create mode 100644 src/main/java/dev/vality/analytics/dao/model/WithdrawalStateSnapshot.java create mode 100644 src/main/java/dev/vality/analytics/dao/repository/clickhouse/ClickHouseWithdrawalBatchPreparedStatementSetter.java create mode 100644 src/main/java/dev/vality/analytics/dao/repository/clickhouse/ClickHouseWithdrawalRepository.java create mode 100644 src/main/java/dev/vality/analytics/dao/repository/postgres/PostgresWithdrawalStateRepository.java create mode 100644 src/main/java/dev/vality/analytics/listener/WithdrawalListener.java create mode 100644 src/main/java/dev/vality/analytics/listener/handler/withdrawal/WithdrawalEventContext.java create mode 100644 src/main/java/dev/vality/analytics/listener/handler/withdrawal/WithdrawalEventHandler.java create mode 100644 src/main/java/dev/vality/analytics/listener/handler/withdrawal/WithdrawalMappingResult.java create mode 100644 src/main/java/dev/vality/analytics/listener/mapper/withdrawal/AbstractWithdrawalMapper.java create mode 100644 src/main/java/dev/vality/analytics/listener/mapper/withdrawal/WithdrawalCreatedMapper.java create mode 100644 src/main/java/dev/vality/analytics/listener/mapper/withdrawal/WithdrawalMapper.java create mode 100644 src/main/java/dev/vality/analytics/listener/mapper/withdrawal/WithdrawalRouteMapper.java create mode 100644 src/main/java/dev/vality/analytics/listener/mapper/withdrawal/WithdrawalStatusMapper.java create mode 100644 src/main/java/dev/vality/analytics/listener/mapper/withdrawal/WithdrawalTransferMapper.java create mode 100644 src/main/resources/db/migration-clickhouse/non-sharded/V3__recreate_shop_dictionary_with_composite_key.sql create mode 100644 src/main/resources/db/migration-clickhouse/non-sharded/V4__create_withdrawal_table.sql create mode 100644 src/main/resources/db/migration-clickhouse/sharded/V3__recreate_shop_dictionary_with_composite_key.sql create mode 100644 src/main/resources/db/migration-clickhouse/sharded/V4__create_withdrawal_table.sql create mode 100644 src/main/resources/db/migration/V21__add_withdrawal_state.sql create mode 100644 src/test/java/dev/vality/analytics/computer/WithdrawalCashFlowComputerTest.java create mode 100644 src/test/java/dev/vality/analytics/listener/WithdrawalListenerTest.java create mode 100644 src/test/java/dev/vality/analytics/listener/handler/withdrawal/WithdrawalEventHandlerTest.java create mode 100644 src/test/java/dev/vality/analytics/utils/WithdrawalEventTestUtils.java diff --git a/pom.xml b/pom.xml index a2e69dc4..8bef9716 100644 --- a/pom.xml +++ b/pom.xml @@ -52,6 +52,11 @@ machinegun-proto 1.43-3decc8f + + dev.vality + fistful-proto + 1.188-f7ce08e + dev.vality.geck filter diff --git a/src/main/java/dev/vality/analytics/computer/WithdrawalCashFlowComputer.java b/src/main/java/dev/vality/analytics/computer/WithdrawalCashFlowComputer.java new file mode 100644 index 00000000..23009005 --- /dev/null +++ b/src/main/java/dev/vality/analytics/computer/WithdrawalCashFlowComputer.java @@ -0,0 +1,86 @@ +package dev.vality.analytics.computer; + +import dev.vality.analytics.domain.CashFlowResult; +import dev.vality.fistful.cashflow.ExternalCashFlowAccount; +import dev.vality.fistful.cashflow.FinalCashFlowPosting; +import dev.vality.fistful.cashflow.MerchantCashFlowAccount; +import dev.vality.fistful.cashflow.ProviderCashFlowAccount; +import dev.vality.fistful.cashflow.SystemCashFlowAccount; +import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; + +import java.util.List; + +@Service +public class WithdrawalCashFlowComputer { + + public CashFlowResult compute(List cashFlow) { + long amount = 0L; + long systemFee = 0L; + long providerFee = 0L; + long externalFee = 0L; + + if (CollectionUtils.isEmpty(cashFlow)) { + return CashFlowResult.EMPTY; + } + + for (FinalCashFlowPosting posting : cashFlow) { + if (posting == null || !posting.isSetSource() || !posting.isSetDestination() || !posting.isSetVolume()) { + continue; + } + + if (isAmount(posting)) { + amount += posting.getVolume().getAmount(); + } + + if (isSystemFee(posting)) { + systemFee += posting.getVolume().getAmount(); + } + + if (isProviderFee(posting)) { + providerFee += posting.getVolume().getAmount(); + } + + if (isExternalFee(posting)) { + externalFee += posting.getVolume().getAmount(); + } + } + + return CashFlowResult.builder() + .amount(amount) + .guaranteeDeposit(0L) + .systemFee(systemFee) + .providerFee(providerFee) + .externalFee(externalFee) + .build(); + } + + private boolean isAmount(FinalCashFlowPosting posting) { + return posting.getSource().getAccountType().isSetMerchant() + && posting.getSource().getAccountType().getMerchant() == MerchantCashFlowAccount.settlement + && posting.getDestination().getAccountType().isSetMerchant() + && posting.getDestination().getAccountType().getMerchant() == MerchantCashFlowAccount.payout; + } + + private boolean isSystemFee(FinalCashFlowPosting posting) { + return posting.getSource().getAccountType().isSetMerchant() + && posting.getSource().getAccountType().getMerchant() == MerchantCashFlowAccount.settlement + && posting.getDestination().getAccountType().isSetSystem() + && posting.getDestination().getAccountType().getSystem() == SystemCashFlowAccount.settlement; + } + + private boolean isProviderFee(FinalCashFlowPosting posting) { + return posting.getSource().getAccountType().isSetSystem() + && posting.getSource().getAccountType().getSystem() == SystemCashFlowAccount.settlement + && posting.getDestination().getAccountType().isSetProvider() + && posting.getDestination().getAccountType().getProvider() == ProviderCashFlowAccount.settlement; + } + + private boolean isExternalFee(FinalCashFlowPosting posting) { + return posting.getSource().getAccountType().isSetSystem() + && posting.getSource().getAccountType().getSystem() == SystemCashFlowAccount.settlement + && posting.getDestination().getAccountType().isSetExternal() + && (posting.getDestination().getAccountType().getExternal() == ExternalCashFlowAccount.income + || posting.getDestination().getAccountType().getExternal() == ExternalCashFlowAccount.outcome); + } +} diff --git a/src/main/java/dev/vality/analytics/config/KafkaConfig.java b/src/main/java/dev/vality/analytics/config/KafkaConfig.java index 600b39e8..0a09f3ed 100644 --- a/src/main/java/dev/vality/analytics/config/KafkaConfig.java +++ b/src/main/java/dev/vality/analytics/config/KafkaConfig.java @@ -36,6 +36,7 @@ public class KafkaConfig { private static final String RESULT_ANALYTICS = "result-analytics"; private static final String PARTY_RESULT_ANALYTICS = "party-result-analytics"; private static final String DOMINANT_ANALYTICS = "dominant-analytics"; + private static final String WITHDRAWAL_ANALYTICS = "withdrawal-analytics"; private final ConsumerGroupIdService consumerGroupIdService; private final KafkaProperties kafkaProperties; @@ -53,6 +54,8 @@ public class KafkaConfig { private String maxPollRecordsDominantListener; @Value("${kafka.topic.rate.max.poll.records}") private String maxPollRecordsRatesListener; + @Value("${kafka.topic.withdrawal.max.poll.records}") + private String maxPollRecordsWithdrawalListener; @Value("${kafka.consumer.concurrency}") private int concurrencyListeners; @Value("${kafka.topic.rate.groupId}") @@ -88,6 +91,16 @@ public ConcurrentKafkaListenerContainerFactory dominan return factory; } + @Bean + public ConcurrentKafkaListenerContainerFactory withdrawalListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + String consumerGroup = consumerGroupIdService.generateGroupId(WITHDRAWAL_ANALYTICS); + initDefaultListenerProperties(factory, consumerGroup, + new MachineEventDeserializer(), maxPollRecordsWithdrawalListener); + return factory; + } + @Bean public ConcurrentKafkaListenerContainerFactory rateContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = diff --git a/src/main/java/dev/vality/analytics/config/SerializeConfig.java b/src/main/java/dev/vality/analytics/config/SerializeConfig.java index e601706d..6886ad98 100644 --- a/src/main/java/dev/vality/analytics/config/SerializeConfig.java +++ b/src/main/java/dev/vality/analytics/config/SerializeConfig.java @@ -1,5 +1,6 @@ package dev.vality.analytics.config; +import dev.vality.fistful.withdrawal.TimestampedChange; import dev.vality.damsel.domain_config_v2.HistoricalCommit; import dev.vality.exrates.events.CurrencyEvent; import dev.vality.geck.serializer.Geck; @@ -33,4 +34,20 @@ public HistoricalCommit deserialize(byte[] bytes) { }); } + @Bean + public BinaryDeserializer withdrawalTimestampedChangeBinaryDeserializer() { + return new AbstractThriftBinaryDeserializer<>() { + @Override + public TimestampedChange deserialize(byte[] bytes) { + return Geck.msgPackToTBase(bytes, TimestampedChange.class); + } + }; + } + + @Bean + public MachineEventParser withdrawalTimestampedChangeMachineEventParser( + BinaryDeserializer withdrawalTimestampedChangeBinaryDeserializer) { + return new MachineEventParser<>(withdrawalTimestampedChangeBinaryDeserializer); + } + } diff --git a/src/main/java/dev/vality/analytics/constant/EventType.java b/src/main/java/dev/vality/analytics/constant/EventType.java index fc943c0e..dc35dbb2 100644 --- a/src/main/java/dev/vality/analytics/constant/EventType.java +++ b/src/main/java/dev/vality/analytics/constant/EventType.java @@ -24,7 +24,10 @@ public enum EventType { ".payload.session_transaction_bound", new IsNullCondition().not()), INVOICE_PAYMENT_RISK_SCORE_CHANGED("invoice_payment_change.payload.invoice_payment_risk_score_changed", new IsNullCondition().not()), - RATE_CREATED("created", new IsNullCondition().not()); + WITHDRAWAL_CREATED("change.created", new IsNullCondition().not()), + WITHDRAWAL_ROUTE_CHANGED("change.route", new IsNullCondition().not()), + WITHDRAWAL_TRANSFER_CHANGED("change.transfer.payload.created", new IsNullCondition().not()), + WITHDRAWAL_STATUS_CHANGED("change.status_changed", new IsNullCondition().not()); Filter filter; diff --git a/src/main/java/dev/vality/analytics/constant/WithdrawalStatus.java b/src/main/java/dev/vality/analytics/constant/WithdrawalStatus.java new file mode 100644 index 00000000..69f7fd31 --- /dev/null +++ b/src/main/java/dev/vality/analytics/constant/WithdrawalStatus.java @@ -0,0 +1,7 @@ +package dev.vality.analytics.constant; + +public enum WithdrawalStatus { + pending, + succeeded, + failed +} diff --git a/src/main/java/dev/vality/analytics/dao/model/AcceptanceDisbursementRawRow.java b/src/main/java/dev/vality/analytics/dao/model/AcceptanceDisbursementRawRow.java new file mode 100644 index 00000000..cff3f7c7 --- /dev/null +++ b/src/main/java/dev/vality/analytics/dao/model/AcceptanceDisbursementRawRow.java @@ -0,0 +1,22 @@ +package dev.vality.analytics.dao.model; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.LocalDate; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class AcceptanceDisbursementRawRow { + + private LocalDate date; + private String currency; + private String locationUrl; + private long turnover; + private long cost; + +} diff --git a/src/main/java/dev/vality/analytics/dao/model/WithdrawalRow.java b/src/main/java/dev/vality/analytics/dao/model/WithdrawalRow.java new file mode 100644 index 00000000..1345a97b --- /dev/null +++ b/src/main/java/dev/vality/analytics/dao/model/WithdrawalRow.java @@ -0,0 +1,34 @@ +package dev.vality.analytics.dao.model; + +import dev.vality.analytics.constant.WithdrawalStatus; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.LocalDateTime; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class WithdrawalRow { + + private LocalDateTime eventTime; + private String partyId; + private String withdrawalId; + private long sequenceId; + private LocalDateTime withdrawalTime; + private String walletId; + private String destinationId; + private String providerId; + private String terminal; + private long amount; + private long guaranteeDeposit; + private long systemFee; + private long providerFee; + private long externalFee; + private String currency; + private WithdrawalStatus status; + +} diff --git a/src/main/java/dev/vality/analytics/dao/model/WithdrawalStateSnapshot.java b/src/main/java/dev/vality/analytics/dao/model/WithdrawalStateSnapshot.java new file mode 100644 index 00000000..d26f2bc5 --- /dev/null +++ b/src/main/java/dev/vality/analytics/dao/model/WithdrawalStateSnapshot.java @@ -0,0 +1,32 @@ +package dev.vality.analytics.dao.model; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.LocalDateTime; + +@Data +@Builder(toBuilder = true) +@NoArgsConstructor +@AllArgsConstructor +public class WithdrawalStateSnapshot { + + private String withdrawalId; + private String partyId; + private String walletId; + private String destinationId; + private String currency; + private Long requestedAmount; + private Long amount; + private Long systemFee; + private Long providerFee; + private Long externalFee; + private LocalDateTime withdrawalCreatedAt; + private String providerId; + private String terminal; + private long lastSequenceId; + private LocalDateTime updatedAt; + +} diff --git a/src/main/java/dev/vality/analytics/dao/repository/clickhouse/ClickHouseWithdrawalBatchPreparedStatementSetter.java b/src/main/java/dev/vality/analytics/dao/repository/clickhouse/ClickHouseWithdrawalBatchPreparedStatementSetter.java new file mode 100644 index 00000000..3fa4ce32 --- /dev/null +++ b/src/main/java/dev/vality/analytics/dao/repository/clickhouse/ClickHouseWithdrawalBatchPreparedStatementSetter.java @@ -0,0 +1,62 @@ +package dev.vality.analytics.dao.repository.clickhouse; + +import dev.vality.analytics.constant.ClickHouseUtilsValue; +import dev.vality.analytics.dao.model.WithdrawalRow; +import org.springframework.jdbc.core.BatchPreparedStatementSetter; + +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.time.ZoneOffset; +import java.time.temporal.ChronoUnit; +import java.util.List; + +public class ClickHouseWithdrawalBatchPreparedStatementSetter implements BatchPreparedStatementSetter { + + public static final String INSERT = "INSERT INTO analytic.events_sink_withdrawal " + + "(timestamp, eventTime, eventTimeHour, partyId, withdrawalId, sequenceId, withdrawalTime, walletId, " + + "destinationId, providerId, terminal, amount, guaranteeDeposit, systemFee, providerFee, externalFee, " + + "currency, status) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + + private final List batch; + + public ClickHouseWithdrawalBatchPreparedStatementSetter(List batch) { + this.batch = batch; + } + + @Override + public void setValues(PreparedStatement ps, int i) throws SQLException { + WithdrawalRow row = batch.get(i); + int column = 1; + ps.setObject(column++, row.getEventTime().toLocalDate()); + ps.setLong(column++, row.getEventTime().toEpochSecond(ZoneOffset.UTC)); + ps.setLong(column++, row.getEventTime().toInstant(ZoneOffset.UTC).truncatedTo(ChronoUnit.HOURS).toEpochMilli()); + ps.setString(column++, defaultString(row.getPartyId())); + ps.setString(column++, defaultString(row.getWithdrawalId())); + ps.setLong(column++, row.getSequenceId()); + ps.setLong(column++, row.getWithdrawalTime().toEpochSecond(ZoneOffset.UTC)); + ps.setString(column++, defaultString(row.getWalletId())); + ps.setString(column++, defaultString(row.getDestinationId())); + ps.setString(column++, defaultString(row.getProviderId())); + ps.setString(column++, defaultString(row.getTerminal())); + ps.setLong(column++, safeUnsigned(row.getAmount())); + ps.setLong(column++, safeUnsigned(row.getGuaranteeDeposit())); + ps.setLong(column++, safeUnsigned(row.getSystemFee())); + ps.setLong(column++, safeUnsigned(row.getProviderFee())); + ps.setLong(column++, safeUnsigned(row.getExternalFee())); + ps.setString(column++, defaultString(row.getCurrency())); + ps.setString(column, row.getStatus().name()); + } + + @Override + public int getBatchSize() { + return batch.size(); + } + + private String defaultString(String value) { + return value != null ? value : ClickHouseUtilsValue.UNKNOWN; + } + + private long safeUnsigned(long value) { + return Math.max(0L, value); + } +} diff --git a/src/main/java/dev/vality/analytics/dao/repository/clickhouse/ClickHouseWithdrawalRepository.java b/src/main/java/dev/vality/analytics/dao/repository/clickhouse/ClickHouseWithdrawalRepository.java new file mode 100644 index 00000000..1af9425a --- /dev/null +++ b/src/main/java/dev/vality/analytics/dao/repository/clickhouse/ClickHouseWithdrawalRepository.java @@ -0,0 +1,36 @@ +package dev.vality.analytics.dao.repository.clickhouse; + +import dev.vality.analytics.dao.model.WithdrawalRow; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.retry.annotation.Backoff; +import org.springframework.retry.annotation.Retryable; +import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; + +import java.sql.SQLException; +import java.util.List; + +@Slf4j +@Service +@RequiredArgsConstructor +public class ClickHouseWithdrawalRepository { + + private final JdbcTemplate clickHouseJdbcTemplate; + + @Retryable(retryFor = SQLException.class, backoff = @Backoff(delay = 5000)) + public void insertBatch(List withdrawalRows) { + if (CollectionUtils.isEmpty(withdrawalRows)) { + return; + } + + log.info("Batch start insert withdrawalRows: {} firstElement: {}", + withdrawalRows.size(), withdrawalRows.get(0).getWithdrawalId()); + clickHouseJdbcTemplate.batchUpdate( + ClickHouseWithdrawalBatchPreparedStatementSetter.INSERT, + new ClickHouseWithdrawalBatchPreparedStatementSetter(withdrawalRows)); + log.info("Batch inserted withdrawalRows: {} firstElement: {}", + withdrawalRows.size(), withdrawalRows.get(0).getWithdrawalId()); + } +} diff --git a/src/main/java/dev/vality/analytics/dao/repository/postgres/PostgresWithdrawalStateRepository.java b/src/main/java/dev/vality/analytics/dao/repository/postgres/PostgresWithdrawalStateRepository.java new file mode 100644 index 00000000..aa16cae0 --- /dev/null +++ b/src/main/java/dev/vality/analytics/dao/repository/postgres/PostgresWithdrawalStateRepository.java @@ -0,0 +1,96 @@ +package dev.vality.analytics.dao.repository.postgres; + +import dev.vality.analytics.dao.model.WithdrawalStateSnapshot; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Service; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Optional; + +@Slf4j +@Service +@RequiredArgsConstructor +public class PostgresWithdrawalStateRepository { + + private static final String UPSERT = "INSERT INTO analytics.withdrawal_state " + + "(withdrawal_id, party_id, wallet_id, destination_id, currency, requested_amount, amount, system_fee, " + + "provider_fee, external_fee, withdrawal_created_at, provider_id, terminal, last_sequence_id, updated_at) " + + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) " + + "ON CONFLICT (withdrawal_id) DO UPDATE SET " + + "party_id = EXCLUDED.party_id, " + + "wallet_id = EXCLUDED.wallet_id, " + + "destination_id = EXCLUDED.destination_id, " + + "currency = EXCLUDED.currency, " + + "requested_amount = EXCLUDED.requested_amount, " + + "amount = EXCLUDED.amount, " + + "system_fee = EXCLUDED.system_fee, " + + "provider_fee = EXCLUDED.provider_fee, " + + "external_fee = EXCLUDED.external_fee, " + + "withdrawal_created_at = EXCLUDED.withdrawal_created_at, " + + "provider_id = EXCLUDED.provider_id, " + + "terminal = EXCLUDED.terminal, " + + "last_sequence_id = EXCLUDED.last_sequence_id, " + + "updated_at = EXCLUDED.updated_at"; + private static final String SELECT = "SELECT withdrawal_id, party_id, wallet_id, destination_id, currency, " + + "requested_amount, amount, system_fee, provider_fee, external_fee, withdrawal_created_at, provider_id, " + + "terminal, last_sequence_id, updated_at " + + "FROM analytics.withdrawal_state WHERE withdrawal_id = ?"; + + private final JdbcTemplate postgresJdbcTemplate; + + public Optional findByWithdrawalId(String withdrawalId) { + return postgresJdbcTemplate.query( + SELECT, + (resultSet, rowNum) -> map(resultSet), + withdrawalId) + .stream() + .findFirst(); + } + + public void upsert(WithdrawalStateSnapshot snapshot) { + postgresJdbcTemplate.update( + UPSERT, + snapshot.getWithdrawalId(), + snapshot.getPartyId(), + snapshot.getWalletId(), + snapshot.getDestinationId(), + snapshot.getCurrency(), + snapshot.getRequestedAmount(), + snapshot.getAmount(), + snapshot.getSystemFee(), + snapshot.getProviderFee(), + snapshot.getExternalFee(), + snapshot.getWithdrawalCreatedAt(), + snapshot.getProviderId(), + snapshot.getTerminal(), + snapshot.getLastSequenceId(), + snapshot.getUpdatedAt()); + log.debug("Upserted withdrawal state, withdrawalId={}, sequenceId={}", + snapshot.getWithdrawalId(), snapshot.getLastSequenceId()); + } + + private WithdrawalStateSnapshot map(ResultSet resultSet) throws SQLException { + return WithdrawalStateSnapshot.builder() + .withdrawalId(resultSet.getString("withdrawal_id")) + .partyId(resultSet.getString("party_id")) + .walletId(resultSet.getString("wallet_id")) + .destinationId(resultSet.getString("destination_id")) + .currency(resultSet.getString("currency")) + .requestedAmount((Long) resultSet.getObject("requested_amount")) + .amount((Long) resultSet.getObject("amount")) + .systemFee((Long) resultSet.getObject("system_fee")) + .providerFee((Long) resultSet.getObject("provider_fee")) + .externalFee((Long) resultSet.getObject("external_fee")) + .withdrawalCreatedAt(resultSet.getTimestamp("withdrawal_created_at") != null + ? resultSet.getTimestamp("withdrawal_created_at").toLocalDateTime() + : null) + .providerId(resultSet.getString("provider_id")) + .terminal(resultSet.getString("terminal")) + .lastSequenceId(resultSet.getLong("last_sequence_id")) + .updatedAt(resultSet.getTimestamp("updated_at").toLocalDateTime()) + .build(); + } +} diff --git a/src/main/java/dev/vality/analytics/listener/WithdrawalListener.java b/src/main/java/dev/vality/analytics/listener/WithdrawalListener.java new file mode 100644 index 00000000..f6f28fd9 --- /dev/null +++ b/src/main/java/dev/vality/analytics/listener/WithdrawalListener.java @@ -0,0 +1,54 @@ +package dev.vality.analytics.listener; + +import dev.vality.analytics.listener.handler.withdrawal.WithdrawalEventHandler; +import dev.vality.machinegun.eventsink.MachineEvent; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; + +import java.time.Duration; +import java.util.List; + +@Slf4j +@Service +@RequiredArgsConstructor +public class WithdrawalListener { + + private final WithdrawalEventHandler withdrawalEventHandler; + + @Value("${kafka.consumer.throttling-timeout-ms}") + private int throttlingTimeout; + + @KafkaListener( + autoStartup = "${kafka.listener.withdrawal.enabled}", + topics = "${kafka.topic.withdrawal.initial}", + containerFactory = "withdrawalListenerContainerFactory") + public void listen( + List batch, + @Header(KafkaHeaders.RECEIVED_PARTITION) int partition, + @Header(KafkaHeaders.OFFSET) int offsets, + Acknowledgment ack) { + log.info("WithdrawalListener listen offsets: {}, partition: {}, batch.size: {}", + offsets, partition, batch.size()); + + try { + if (CollectionUtils.isEmpty(batch)) { + ack.acknowledge(); + return; + } + + withdrawalEventHandler.handle(batch); + ack.acknowledge(); + } catch (Exception e) { + log.error("Error when WithdrawalListener listen", e); + ack.nack(Duration.ofMillis(throttlingTimeout)); + throw e; + } + } +} diff --git a/src/main/java/dev/vality/analytics/listener/handler/withdrawal/WithdrawalEventContext.java b/src/main/java/dev/vality/analytics/listener/handler/withdrawal/WithdrawalEventContext.java new file mode 100644 index 00000000..56606f28 --- /dev/null +++ b/src/main/java/dev/vality/analytics/listener/handler/withdrawal/WithdrawalEventContext.java @@ -0,0 +1,21 @@ +package dev.vality.analytics.listener.handler.withdrawal; + +import dev.vality.analytics.dao.model.WithdrawalStateSnapshot; +import dev.vality.fistful.withdrawal.TimestampedChange; +import dev.vality.machinegun.eventsink.MachineEvent; +import lombok.Builder; +import lombok.Value; + +import java.time.LocalDateTime; + +@Value +@Builder +public class WithdrawalEventContext { + + MachineEvent machineEvent; + TimestampedChange timestampedChange; + LocalDateTime eventTime; + String withdrawalId; + WithdrawalStateSnapshot currentState; + +} diff --git a/src/main/java/dev/vality/analytics/listener/handler/withdrawal/WithdrawalEventHandler.java b/src/main/java/dev/vality/analytics/listener/handler/withdrawal/WithdrawalEventHandler.java new file mode 100644 index 00000000..0e67ec20 --- /dev/null +++ b/src/main/java/dev/vality/analytics/listener/handler/withdrawal/WithdrawalEventHandler.java @@ -0,0 +1,185 @@ +package dev.vality.analytics.listener.handler.withdrawal; + +import dev.vality.analytics.dao.model.WithdrawalRow; +import dev.vality.analytics.dao.model.WithdrawalStateSnapshot; +import dev.vality.analytics.dao.repository.clickhouse.ClickHouseWithdrawalRepository; +import dev.vality.analytics.dao.repository.postgres.PostgresWithdrawalStateRepository; +import dev.vality.analytics.listener.mapper.withdrawal.WithdrawalMapper; +import dev.vality.fistful.withdrawal.TimestampedChange; +import dev.vality.geck.common.util.TypeUtil; +import dev.vality.machinegun.eventsink.MachineEvent; +import dev.vality.sink.common.parser.impl.MachineEventParser; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; + +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +@Slf4j +@Service +@RequiredArgsConstructor +public class WithdrawalEventHandler { + + private final MachineEventParser withdrawalTimestampedChangeMachineEventParser; + private final PostgresWithdrawalStateRepository postgresWithdrawalStateRepository; + private final ClickHouseWithdrawalRepository clickHouseWithdrawalRepository; + private final List withdrawalMappers; + + public void handle(List batch) { + if (CollectionUtils.isEmpty(batch)) { + return; + } + + Map stateCache = new HashMap<>(); + List withdrawalRows = new ArrayList<>(); + + for (MachineEvent machineEvent : batch) { + WithdrawalEventContext context = prepareContext(machineEvent, stateCache); + if (context == null) { + continue; + } + + WithdrawalMappingResult result = map(context); + if (result == null) { + continue; + } + + apply(result, stateCache, withdrawalRows); + } + + clickHouseWithdrawalRepository.insertBatch(withdrawalRows); + } + + private WithdrawalEventContext prepareContext( + MachineEvent machineEvent, + Map stateCache) { + TimestampedChange timestampedChange = parse(machineEvent); + if (timestampedChange == null || !timestampedChange.isSetChange()) { + return null; + } + + LocalDateTime eventTime = parseTime(timestampedChange.getOccuredAt()); + if (eventTime == null) { + log.warn("Skipping withdrawal event with invalid occured_at, sourceId={}, eventId={}", + machineEvent.getSourceId(), machineEvent.getEventId()); + return null; + } + + String withdrawalId = resolveWithdrawalId(machineEvent, timestampedChange); + if (withdrawalId == null) { + log.warn("Skipping withdrawal event without withdrawal id, eventId={}", machineEvent.getEventId()); + return null; + } + + WithdrawalStateSnapshot currentState = getState(withdrawalId, stateCache); + if (currentState != null && machineEvent.getEventId() <= currentState.getLastSequenceId()) { + log.debug("Skipping stale withdrawal event, withdrawalId={}, eventId={}, lastSequenceId={}", + withdrawalId, machineEvent.getEventId(), currentState.getLastSequenceId()); + return null; + } + + return WithdrawalEventContext.builder() + .machineEvent(machineEvent) + .timestampedChange(timestampedChange) + .eventTime(eventTime) + .withdrawalId(withdrawalId) + .currentState(currentState) + .build(); + } + + private WithdrawalMappingResult map(WithdrawalEventContext context) { + for (WithdrawalMapper mapper : withdrawalMappers) { + if (mapper.accept(context.getTimestampedChange())) { + WithdrawalMappingResult result = mapper.map(context.getTimestampedChange(), context); + if (result == null) { + if (context.getCurrentState() == null) { + log.warn("Skipping {} change without reducer state, withdrawalId={}, eventId={}", + mapper.getChangeType(), + context.getWithdrawalId(), + context.getMachineEvent().getEventId()); + } else { + log.debug("Skipping {} change because mapper produced no update, withdrawalId={}, eventId={}", + mapper.getChangeType(), + context.getWithdrawalId(), + context.getMachineEvent().getEventId()); + } + } + return result; + } + } + + log.debug("No withdrawal mapper matched, withdrawalId={}, eventId={}", + context.getWithdrawalId(), context.getMachineEvent().getEventId()); + return null; + } + + private void apply( + WithdrawalMappingResult result, + Map stateCache, + List withdrawalRows) { + if (result.getStateSnapshot() != null) { + upsert(result.getStateSnapshot(), stateCache); + } + + if (result.getWithdrawalRow() != null) { + withdrawalRows.add(result.getWithdrawalRow()); + } + } + + private void upsert(WithdrawalStateSnapshot snapshot, Map stateCache) { + postgresWithdrawalStateRepository.upsert(snapshot); + stateCache.put(snapshot.getWithdrawalId(), snapshot); + } + + private WithdrawalStateSnapshot getState(String withdrawalId, Map stateCache) { + WithdrawalStateSnapshot cached = stateCache.get(withdrawalId); + if (cached != null) { + return cached; + } + + Optional stored = postgresWithdrawalStateRepository.findByWithdrawalId(withdrawalId); + stored.ifPresent(snapshot -> stateCache.put(withdrawalId, snapshot)); + return stored.orElse(null); + } + + private TimestampedChange parse(MachineEvent machineEvent) { + try { + return withdrawalTimestampedChangeMachineEventParser.parse(machineEvent); + } catch (Exception e) { + log.warn("Failed to parse withdrawal event, sourceId={}, eventId={}", + machineEvent.getSourceId(), machineEvent.getEventId(), e); + return null; + } + } + + private String resolveWithdrawalId(MachineEvent machineEvent, TimestampedChange timestampedChange) { + if (machineEvent.isSetSourceId()) { + return machineEvent.getSourceId(); + } + + if (timestampedChange.getChange().isSetCreated()) { + return timestampedChange.getChange().getCreated().getWithdrawal().getId(); + } + + return null; + } + + private LocalDateTime parseTime(String timestamp) { + if (timestamp == null) { + return null; + } + + try { + return TypeUtil.stringToLocalDateTime(timestamp); + } catch (Exception e) { + log.warn("Failed to parse withdrawal timestamp '{}'", timestamp, e); + return null; + } + } +} diff --git a/src/main/java/dev/vality/analytics/listener/handler/withdrawal/WithdrawalMappingResult.java b/src/main/java/dev/vality/analytics/listener/handler/withdrawal/WithdrawalMappingResult.java new file mode 100644 index 00000000..6048927c --- /dev/null +++ b/src/main/java/dev/vality/analytics/listener/handler/withdrawal/WithdrawalMappingResult.java @@ -0,0 +1,15 @@ +package dev.vality.analytics.listener.handler.withdrawal; + +import dev.vality.analytics.dao.model.WithdrawalRow; +import dev.vality.analytics.dao.model.WithdrawalStateSnapshot; +import lombok.Builder; +import lombok.Value; + +@Value +@Builder +public class WithdrawalMappingResult { + + WithdrawalStateSnapshot stateSnapshot; + WithdrawalRow withdrawalRow; + +} diff --git a/src/main/java/dev/vality/analytics/listener/mapper/withdrawal/AbstractWithdrawalMapper.java b/src/main/java/dev/vality/analytics/listener/mapper/withdrawal/AbstractWithdrawalMapper.java new file mode 100644 index 00000000..0d458a59 --- /dev/null +++ b/src/main/java/dev/vality/analytics/listener/mapper/withdrawal/AbstractWithdrawalMapper.java @@ -0,0 +1,79 @@ +package dev.vality.analytics.listener.mapper.withdrawal; + +import dev.vality.analytics.constant.WithdrawalStatus; +import dev.vality.analytics.listener.handler.withdrawal.WithdrawalEventContext; +import dev.vality.analytics.listener.handler.withdrawal.WithdrawalMappingResult; +import dev.vality.analytics.listener.mapper.AbstractMapper; +import dev.vality.fistful.withdrawal.TimestampedChange; +import dev.vality.geck.common.util.TypeUtil; + +import java.time.LocalDateTime; + +public abstract class AbstractWithdrawalMapper + extends AbstractMapper + implements WithdrawalMapper { + + protected LocalDateTime parseTime(String timestamp) { + if (timestamp == null) { + return null; + } + + try { + return TypeUtil.stringToLocalDateTime(timestamp); + } catch (Exception e) { + return null; + } + } + + protected String extractProviderId(dev.vality.fistful.withdrawal.Route route) { + if (route == null) { + return null; + } + + if (route.isSetProviderId()) { + return String.valueOf(route.getProviderId()); + } + + if (route.isSetProviderIdLegacy()) { + return route.getProviderIdLegacy(); + } + + return null; + } + + protected String extractTerminal(dev.vality.fistful.withdrawal.Route route) { + if (route == null) { + return null; + } + + if (route.isSetTerminalId()) { + return String.valueOf(route.getTerminalId()); + } + + if (route.isSetTerminalIdLegacy()) { + return route.getTerminalIdLegacy(); + } + + return null; + } + + protected WithdrawalStatus mapStatus(dev.vality.fistful.withdrawal.status.Status status) { + if (status == null) { + return null; + } + + if (status.isSetPending()) { + return WithdrawalStatus.pending; + } + + if (status.isSetSucceeded()) { + return WithdrawalStatus.succeeded; + } + + if (status.isSetFailed()) { + return WithdrawalStatus.failed; + } + + return null; + } +} diff --git a/src/main/java/dev/vality/analytics/listener/mapper/withdrawal/WithdrawalCreatedMapper.java b/src/main/java/dev/vality/analytics/listener/mapper/withdrawal/WithdrawalCreatedMapper.java new file mode 100644 index 00000000..c82b558f --- /dev/null +++ b/src/main/java/dev/vality/analytics/listener/mapper/withdrawal/WithdrawalCreatedMapper.java @@ -0,0 +1,48 @@ +package dev.vality.analytics.listener.mapper.withdrawal; + +import dev.vality.analytics.constant.EventType; +import dev.vality.analytics.dao.model.WithdrawalStateSnapshot; +import dev.vality.analytics.listener.handler.withdrawal.WithdrawalEventContext; +import dev.vality.analytics.listener.handler.withdrawal.WithdrawalMappingResult; +import dev.vality.fistful.withdrawal.TimestampedChange; +import dev.vality.fistful.withdrawal.Withdrawal; +import org.springframework.stereotype.Component; + +@Component +public class WithdrawalCreatedMapper extends AbstractWithdrawalMapper { + + @Override + public WithdrawalMappingResult map(TimestampedChange change, WithdrawalEventContext context) { + Withdrawal withdrawal = change.getChange().getCreated().getWithdrawal(); + WithdrawalStateSnapshot currentState = context.getCurrentState(); + WithdrawalStateSnapshot.WithdrawalStateSnapshotBuilder builder = currentState != null + ? currentState.toBuilder() + : WithdrawalStateSnapshot.builder().withdrawalId(context.getWithdrawalId()); + + builder.withdrawalId(context.getWithdrawalId()) + .partyId(withdrawal.getPartyId()) + .walletId(withdrawal.getWalletId()) + .destinationId(withdrawal.getDestinationId()) + .currency(withdrawal.getBody() != null && withdrawal.getBody().getCurrency() != null + ? withdrawal.getBody().getCurrency().getSymbolicCode() + : null) + .requestedAmount(withdrawal.getBody() != null ? withdrawal.getBody().getAmount() : null) + .withdrawalCreatedAt(parseTime(withdrawal.getCreatedAt())) + .lastSequenceId(context.getMachineEvent().getEventId()) + .updatedAt(context.getEventTime()); + + if (withdrawal.isSetRoute()) { + builder.providerId(extractProviderId(withdrawal.getRoute())) + .terminal(extractTerminal(withdrawal.getRoute())); + } + + return WithdrawalMappingResult.builder() + .stateSnapshot(builder.build()) + .build(); + } + + @Override + public EventType getChangeType() { + return EventType.WITHDRAWAL_CREATED; + } +} diff --git a/src/main/java/dev/vality/analytics/listener/mapper/withdrawal/WithdrawalMapper.java b/src/main/java/dev/vality/analytics/listener/mapper/withdrawal/WithdrawalMapper.java new file mode 100644 index 00000000..83d56bc9 --- /dev/null +++ b/src/main/java/dev/vality/analytics/listener/mapper/withdrawal/WithdrawalMapper.java @@ -0,0 +1,9 @@ +package dev.vality.analytics.listener.mapper.withdrawal; + +import dev.vality.analytics.listener.handler.withdrawal.WithdrawalEventContext; +import dev.vality.analytics.listener.handler.withdrawal.WithdrawalMappingResult; +import dev.vality.analytics.listener.mapper.Mapper; +import dev.vality.fistful.withdrawal.TimestampedChange; + +public interface WithdrawalMapper extends Mapper { +} diff --git a/src/main/java/dev/vality/analytics/listener/mapper/withdrawal/WithdrawalRouteMapper.java b/src/main/java/dev/vality/analytics/listener/mapper/withdrawal/WithdrawalRouteMapper.java new file mode 100644 index 00000000..568d27b6 --- /dev/null +++ b/src/main/java/dev/vality/analytics/listener/mapper/withdrawal/WithdrawalRouteMapper.java @@ -0,0 +1,35 @@ +package dev.vality.analytics.listener.mapper.withdrawal; + +import dev.vality.analytics.constant.EventType; +import dev.vality.analytics.dao.model.WithdrawalStateSnapshot; +import dev.vality.analytics.listener.handler.withdrawal.WithdrawalEventContext; +import dev.vality.analytics.listener.handler.withdrawal.WithdrawalMappingResult; +import dev.vality.fistful.withdrawal.TimestampedChange; +import org.springframework.stereotype.Component; + +@Component +public class WithdrawalRouteMapper extends AbstractWithdrawalMapper { + + @Override + public WithdrawalMappingResult map(TimestampedChange change, WithdrawalEventContext context) { + WithdrawalStateSnapshot currentState = context.getCurrentState(); + if (currentState == null) { + return null; + } + + dev.vality.fistful.withdrawal.Route route = change.getChange().getRoute().getRoute(); + return WithdrawalMappingResult.builder() + .stateSnapshot(currentState.toBuilder() + .providerId(extractProviderId(route)) + .terminal(extractTerminal(route)) + .lastSequenceId(context.getMachineEvent().getEventId()) + .updatedAt(context.getEventTime()) + .build()) + .build(); + } + + @Override + public EventType getChangeType() { + return EventType.WITHDRAWAL_ROUTE_CHANGED; + } +} diff --git a/src/main/java/dev/vality/analytics/listener/mapper/withdrawal/WithdrawalStatusMapper.java b/src/main/java/dev/vality/analytics/listener/mapper/withdrawal/WithdrawalStatusMapper.java new file mode 100644 index 00000000..4ad15212 --- /dev/null +++ b/src/main/java/dev/vality/analytics/listener/mapper/withdrawal/WithdrawalStatusMapper.java @@ -0,0 +1,65 @@ +package dev.vality.analytics.listener.mapper.withdrawal; + +import dev.vality.analytics.constant.EventType; +import dev.vality.analytics.constant.WithdrawalStatus; +import dev.vality.analytics.dao.model.WithdrawalRow; +import dev.vality.analytics.dao.model.WithdrawalStateSnapshot; +import dev.vality.analytics.listener.handler.withdrawal.WithdrawalEventContext; +import dev.vality.analytics.listener.handler.withdrawal.WithdrawalMappingResult; +import dev.vality.fistful.withdrawal.TimestampedChange; +import org.springframework.stereotype.Component; + +import java.util.Optional; + +@Component +public class WithdrawalStatusMapper extends AbstractWithdrawalMapper { + + @Override + public WithdrawalMappingResult map(TimestampedChange change, WithdrawalEventContext context) { + WithdrawalStateSnapshot currentState = context.getCurrentState(); + if (currentState == null) { + return null; + } + + WithdrawalStatus status = mapStatus(change.getChange().getStatusChanged().getStatus()); + if (status == null) { + return null; + } + + WithdrawalStateSnapshot updatedState = currentState.toBuilder() + .lastSequenceId(context.getMachineEvent().getEventId()) + .updatedAt(context.getEventTime()) + .build(); + + WithdrawalRow row = WithdrawalRow.builder() + .eventTime(context.getEventTime()) + .partyId(updatedState.getPartyId()) + .withdrawalId(updatedState.getWithdrawalId()) + .sequenceId(context.getMachineEvent().getEventId()) + .withdrawalTime(Optional.ofNullable(updatedState.getWithdrawalCreatedAt()) + .orElse(context.getEventTime())) + .walletId(updatedState.getWalletId()) + .destinationId(updatedState.getDestinationId()) + .providerId(updatedState.getProviderId()) + .terminal(updatedState.getTerminal()) + .amount(Optional.ofNullable(updatedState.getAmount()) + .orElse(Optional.ofNullable(updatedState.getRequestedAmount()).orElse(0L))) + .guaranteeDeposit(0L) + .systemFee(Optional.ofNullable(updatedState.getSystemFee()).orElse(0L)) + .providerFee(Optional.ofNullable(updatedState.getProviderFee()).orElse(0L)) + .externalFee(Optional.ofNullable(updatedState.getExternalFee()).orElse(0L)) + .currency(updatedState.getCurrency()) + .status(status) + .build(); + + return WithdrawalMappingResult.builder() + .stateSnapshot(updatedState) + .withdrawalRow(row) + .build(); + } + + @Override + public EventType getChangeType() { + return EventType.WITHDRAWAL_STATUS_CHANGED; + } +} diff --git a/src/main/java/dev/vality/analytics/listener/mapper/withdrawal/WithdrawalTransferMapper.java b/src/main/java/dev/vality/analytics/listener/mapper/withdrawal/WithdrawalTransferMapper.java new file mode 100644 index 00000000..0d0d3f5e --- /dev/null +++ b/src/main/java/dev/vality/analytics/listener/mapper/withdrawal/WithdrawalTransferMapper.java @@ -0,0 +1,55 @@ +package dev.vality.analytics.listener.mapper.withdrawal; + +import dev.vality.analytics.computer.WithdrawalCashFlowComputer; +import dev.vality.analytics.constant.EventType; +import dev.vality.analytics.dao.model.WithdrawalStateSnapshot; +import dev.vality.analytics.domain.CashFlowResult; +import dev.vality.analytics.listener.handler.withdrawal.WithdrawalEventContext; +import dev.vality.analytics.listener.handler.withdrawal.WithdrawalMappingResult; +import dev.vality.fistful.cashflow.FinalCashFlow; +import dev.vality.fistful.withdrawal.TimestampedChange; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; + +@Component +@RequiredArgsConstructor +public class WithdrawalTransferMapper extends AbstractWithdrawalMapper { + + private final WithdrawalCashFlowComputer withdrawalCashFlowComputer; + + @Override + public boolean accept(TimestampedChange change) { + return getChangeType().getFilter().match(change) + && change.getChange().getTransfer().isSetPayload() + && change.getChange().getTransfer().getPayload().isSetCreated(); + } + + @Override + public WithdrawalMappingResult map(TimestampedChange change, WithdrawalEventContext context) { + WithdrawalStateSnapshot currentState = context.getCurrentState(); + if (currentState == null) { + return null; + } + + dev.vality.fistful.transfer.Change transferChange = change.getChange().getTransfer().getPayload(); + FinalCashFlow cashFlow = transferChange.getCreated().getTransfer().getCashflow(); + CashFlowResult cashFlowResult = withdrawalCashFlowComputer.compute( + cashFlow != null ? cashFlow.getPostings() : null); + + return WithdrawalMappingResult.builder() + .stateSnapshot(currentState.toBuilder() + .amount(cashFlowResult.getAmount()) + .systemFee(cashFlowResult.getSystemFee()) + .providerFee(cashFlowResult.getProviderFee()) + .externalFee(cashFlowResult.getExternalFee()) + .lastSequenceId(context.getMachineEvent().getEventId()) + .updatedAt(context.getEventTime()) + .build()) + .build(); + } + + @Override + public EventType getChangeType() { + return EventType.WITHDRAWAL_TRANSFER_CHANGED; + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 4f88a505..6c3bd33f 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -91,6 +91,9 @@ kafka: dominant: initial: mg-events-dominant max.poll.records: 50 + withdrawal: + initial: mg-events-withdrawal + max.poll.records: 50 rate: initial: etl-exchange-rate groupId: analytics-rate-group @@ -107,6 +110,8 @@ kafka: enabled: true dominant: enabled: true + withdrawal: + enabled: true rate: enabled: true diff --git a/src/main/resources/db/migration-clickhouse/non-sharded/V3__recreate_shop_dictionary_with_composite_key.sql b/src/main/resources/db/migration-clickhouse/non-sharded/V3__recreate_shop_dictionary_with_composite_key.sql new file mode 100644 index 00000000..c0a15f65 --- /dev/null +++ b/src/main/resources/db/migration-clickhouse/non-sharded/V3__recreate_shop_dictionary_with_composite_key.sql @@ -0,0 +1,43 @@ +DROP DICTIONARY IF EXISTS analytic.shop_dictionary; + +CREATE DICTIONARY IF NOT EXISTS analytic.shop_dictionary ( + party_id String, + shop_id String, + id UInt64, + event_id UInt64, + event_time DateTime, + category_id Int32, + created_at DateTime, + blocking String, + blocked_reason String, + blocked_since DateTime, + unblocked_reason String, + unblocked_since DateTime, + suspension String, + suspension_active_since DateTime, + suspension_suspended_since DateTime, + details_name String, + details_description String, + location_url String, + account_currency_code String, + account_settlement String, + account_guarantee String, + international_legal_entity_country_code String, + version_id UInt64, + changed_by_id String, + changed_by_email String, + changed_by_name String, + deleted Bool +) +PRIMARY KEY party_id, shop_id +SOURCE(POSTGRESQL( + HOST '<>' + PORT <> + USER '<>' + PASSWORD '<>' + DB '<>' + SCHEMA '<>' + TABLE 'shop' + )) +LAYOUT(COMPLEX_KEY_HASHED()) +LIFETIME(MIN 300 MAX 360); diff --git a/src/main/resources/db/migration-clickhouse/non-sharded/V4__create_withdrawal_table.sql b/src/main/resources/db/migration-clickhouse/non-sharded/V4__create_withdrawal_table.sql new file mode 100644 index 00000000..6e2837b0 --- /dev/null +++ b/src/main/resources/db/migration-clickhouse/non-sharded/V4__create_withdrawal_table.sql @@ -0,0 +1,22 @@ +CREATE TABLE IF NOT EXISTS analytic.events_sink_withdrawal ( + timestamp Date, + eventTime UInt64, + eventTimeHour UInt64, + partyId String, + withdrawalId String, + sequenceId UInt64, + withdrawalTime UInt64, + walletId String, + destinationId String, + providerId String, + terminal String, + amount UInt64, + guaranteeDeposit UInt64, + systemFee UInt64, + providerFee UInt64, + externalFee UInt64, + currency String, + status Enum8('pending' = 1, 'succeeded' = 2, 'failed' = 3) +) ENGINE = ReplacingMergeTree() +PARTITION BY toYYYYMM(timestamp) +ORDER BY (eventTimeHour, partyId, status, currency, providerId, terminal, withdrawalId, sequenceId); diff --git a/src/main/resources/db/migration-clickhouse/sharded/V3__recreate_shop_dictionary_with_composite_key.sql b/src/main/resources/db/migration-clickhouse/sharded/V3__recreate_shop_dictionary_with_composite_key.sql new file mode 100644 index 00000000..c0a15f65 --- /dev/null +++ b/src/main/resources/db/migration-clickhouse/sharded/V3__recreate_shop_dictionary_with_composite_key.sql @@ -0,0 +1,43 @@ +DROP DICTIONARY IF EXISTS analytic.shop_dictionary; + +CREATE DICTIONARY IF NOT EXISTS analytic.shop_dictionary ( + party_id String, + shop_id String, + id UInt64, + event_id UInt64, + event_time DateTime, + category_id Int32, + created_at DateTime, + blocking String, + blocked_reason String, + blocked_since DateTime, + unblocked_reason String, + unblocked_since DateTime, + suspension String, + suspension_active_since DateTime, + suspension_suspended_since DateTime, + details_name String, + details_description String, + location_url String, + account_currency_code String, + account_settlement String, + account_guarantee String, + international_legal_entity_country_code String, + version_id UInt64, + changed_by_id String, + changed_by_email String, + changed_by_name String, + deleted Bool +) +PRIMARY KEY party_id, shop_id +SOURCE(POSTGRESQL( + HOST '<>' + PORT <> + USER '<>' + PASSWORD '<>' + DB '<>' + SCHEMA '<>' + TABLE 'shop' + )) +LAYOUT(COMPLEX_KEY_HASHED()) +LIFETIME(MIN 300 MAX 360); diff --git a/src/main/resources/db/migration-clickhouse/sharded/V4__create_withdrawal_table.sql b/src/main/resources/db/migration-clickhouse/sharded/V4__create_withdrawal_table.sql new file mode 100644 index 00000000..10e1dad4 --- /dev/null +++ b/src/main/resources/db/migration-clickhouse/sharded/V4__create_withdrawal_table.sql @@ -0,0 +1,25 @@ +CREATE TABLE IF NOT EXISTS analytic.events_sink_withdrawal_local ( + timestamp Date, + eventTime UInt64, + eventTimeHour UInt64, + partyId String, + withdrawalId String, + sequenceId UInt64, + withdrawalTime UInt64, + walletId String, + destinationId String, + providerId String, + terminal String, + amount UInt64, + guaranteeDeposit UInt64, + systemFee UInt64, + providerFee UInt64, + externalFee UInt64, + currency String, + status Enum8('pending' = 1, 'succeeded' = 2, 'failed' = 3) +) ENGINE = ReplicatedReplacingMergeTree('/clickhouse/<>/tables/<>/{database}/{table}', '<>') +PARTITION BY toYYYYMM(timestamp) +ORDER BY (eventTimeHour, partyId, status, currency, providerId, terminal, withdrawalId, sequenceId); + +CREATE TABLE IF NOT EXISTS analytic.events_sink_withdrawal AS analytic.events_sink_withdrawal_local +ENGINE = Distributed('<>', analytic, events_sink_withdrawal_local, cityHash64(timestamp, partyId)); diff --git a/src/main/resources/db/migration/V21__add_withdrawal_state.sql b/src/main/resources/db/migration/V21__add_withdrawal_state.sql new file mode 100644 index 00000000..c20a4f6f --- /dev/null +++ b/src/main/resources/db/migration/V21__add_withdrawal_state.sql @@ -0,0 +1,18 @@ +CREATE TABLE analytics.withdrawal_state ( + withdrawal_id VARCHAR NOT NULL, + party_id VARCHAR NOT NULL, + wallet_id VARCHAR, + destination_id VARCHAR, + currency VARCHAR, + requested_amount BIGINT, + amount BIGINT, + system_fee BIGINT, + provider_fee BIGINT, + external_fee BIGINT, + withdrawal_created_at TIMESTAMP WITHOUT TIME ZONE, + provider_id VARCHAR, + terminal VARCHAR, + last_sequence_id BIGINT NOT NULL, + updated_at TIMESTAMP WITHOUT TIME ZONE NOT NULL, + CONSTRAINT withdrawal_state_pkey PRIMARY KEY (withdrawal_id) +); diff --git a/src/test/java/dev/vality/analytics/computer/WithdrawalCashFlowComputerTest.java b/src/test/java/dev/vality/analytics/computer/WithdrawalCashFlowComputerTest.java new file mode 100644 index 00000000..23bc55ef --- /dev/null +++ b/src/test/java/dev/vality/analytics/computer/WithdrawalCashFlowComputerTest.java @@ -0,0 +1,48 @@ +package dev.vality.analytics.computer; + +import dev.vality.analytics.domain.CashFlowResult; +import dev.vality.analytics.utils.WithdrawalEventTestUtils; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +public class WithdrawalCashFlowComputerTest { + + private WithdrawalCashFlowComputer withdrawalCashFlowComputer; + + @BeforeEach + public void setUp() { + withdrawalCashFlowComputer = new WithdrawalCashFlowComputer(); + } + + @Test + public void shouldMapAllSupportedPostingTypes() { + CashFlowResult result = withdrawalCashFlowComputer.compute(List.of( + WithdrawalEventTestUtils.merchantToPayout(1000L), + WithdrawalEventTestUtils.merchantToSystem(100L), + WithdrawalEventTestUtils.systemToProvider(20L), + WithdrawalEventTestUtils.systemToExternal(10L))); + + assertThat(result.getAmount(), is(1000L)); + assertThat(result.getSystemFee(), is(100L)); + assertThat(result.getProviderFee(), is(20L)); + assertThat(result.getExternalFee(), is(10L)); + assertThat(result.getGuaranteeDeposit(), is(0L)); + } + + @Test + public void shouldIgnoreUnsupportedPostings() { + CashFlowResult result = withdrawalCashFlowComputer.compute(List.of( + WithdrawalEventTestUtils.unrelatedPosting(999L))); + + assertThat(result.getAmount(), is(0L)); + assertThat(result.getSystemFee(), is(0L)); + assertThat(result.getProviderFee(), is(0L)); + assertThat(result.getExternalFee(), is(0L)); + assertThat(result.getGuaranteeDeposit(), is(0L)); + } +} diff --git a/src/test/java/dev/vality/analytics/config/KafkaTest.java b/src/test/java/dev/vality/analytics/config/KafkaTest.java index fb399da9..d311667b 100644 --- a/src/test/java/dev/vality/analytics/config/KafkaTest.java +++ b/src/test/java/dev/vality/analytics/config/KafkaTest.java @@ -17,7 +17,8 @@ "kafka.topic.event.sink.initial", "kafka.topic.party.initial", "kafka.topic.rate.initial", - "kafka.topic.dominant.initial" + "kafka.topic.dominant.initial", + "kafka.topic.withdrawal.initial" }) @KafkaTestConfig public @interface KafkaTest { diff --git a/src/test/java/dev/vality/analytics/listener/WithdrawalListenerTest.java b/src/test/java/dev/vality/analytics/listener/WithdrawalListenerTest.java new file mode 100644 index 00000000..94bfdaa2 --- /dev/null +++ b/src/test/java/dev/vality/analytics/listener/WithdrawalListenerTest.java @@ -0,0 +1,93 @@ +package dev.vality.analytics.listener; + +import dev.vality.analytics.config.SpringBootITest; +import dev.vality.analytics.utils.WithdrawalEventTestUtils; +import dev.vality.testcontainers.annotations.kafka.config.KafkaProducer; +import org.apache.thrift.TBase; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.jdbc.core.JdbcTemplate; + +import java.time.LocalDateTime; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +@SpringBootITest +public class WithdrawalListenerTest { + + @Value("${kafka.topic.withdrawal.initial}") + private String withdrawalTopic; + + @Autowired + private JdbcTemplate postgresJdbcTemplate; + @Autowired + private JdbcTemplate clickHouseJdbcTemplate; + @Autowired + private KafkaProducer> testThriftKafkaProducer; + + @Test + public void shouldReduceWithdrawalStateAndWriteClickHouseSnapshot() { + seedShopDictionary(); + + List> flow = WithdrawalEventTestUtils.fullSuccessFlow(); + flow.forEach(event -> testThriftKafkaProducer.send(withdrawalTopic, event)); + + Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> { + Integer count = clickHouseJdbcTemplate.queryForObject( + "SELECT count(*) FROM analytic.events_sink_withdrawal WHERE withdrawalId = ?", + Integer.class, + WithdrawalEventTestUtils.WITHDRAWAL_ID); + return count != null && count == 1; + }); + + Map stateRow = postgresJdbcTemplate.queryForMap( + "SELECT * FROM analytics.withdrawal_state WHERE withdrawal_id = ?", + WithdrawalEventTestUtils.WITHDRAWAL_ID); + assertEquals(WithdrawalEventTestUtils.PARTY_ID, stateRow.get("party_id")); + assertEquals(WithdrawalEventTestUtils.CURRENCY, stateRow.get("currency")); + assertEquals(1200L, ((Number) stateRow.get("amount")).longValue()); + assertEquals(100L, ((Number) stateRow.get("system_fee")).longValue()); + assertEquals(20L, ((Number) stateRow.get("provider_fee")).longValue()); + assertEquals(10L, ((Number) stateRow.get("external_fee")).longValue()); + assertEquals("42", stateRow.get("provider_id")); + assertEquals("24", stateRow.get("terminal")); + + Map withdrawalRow = clickHouseJdbcTemplate.queryForMap( + "SELECT partyId, currency, providerId, terminal, amount, systemFee, providerFee, externalFee, status " + + "FROM analytic.events_sink_withdrawal WHERE withdrawalId = ? AND status = 'succeeded'", + WithdrawalEventTestUtils.WITHDRAWAL_ID); + assertEquals(WithdrawalEventTestUtils.PARTY_ID, withdrawalRow.get("partyId")); + assertEquals(WithdrawalEventTestUtils.CURRENCY, withdrawalRow.get("currency")); + assertEquals("42", withdrawalRow.get("providerId")); + assertEquals("24", withdrawalRow.get("terminal")); + assertEquals(1200L, ((Number) withdrawalRow.get("amount")).longValue()); + assertEquals(100L, ((Number) withdrawalRow.get("systemFee")).longValue()); + assertEquals(20L, ((Number) withdrawalRow.get("providerFee")).longValue()); + assertEquals(10L, ((Number) withdrawalRow.get("externalFee")).longValue()); + + clickHouseJdbcTemplate.execute("SYSTEM RELOAD DICTIONARY analytic.shop_dictionary"); + String locationUrl = clickHouseJdbcTemplate.queryForObject( + "SELECT dictGet('analytic.shop_dictionary', 'location_url', tuple(?, ?))", + String.class, + WithdrawalEventTestUtils.PARTY_ID, + "shop-dict-1"); + assertEquals("https://merchant.example/shop-dict-1", locationUrl); + } + + private void seedShopDictionary() { + postgresJdbcTemplate.update( + "INSERT INTO analytics.shop " + + "(event_id, event_time, party_id, shop_id, location_url) " + + "VALUES (?, ?, ?, ?, ?)", + 1L, + LocalDateTime.of(2024, 1, 10, 10, 0), + WithdrawalEventTestUtils.PARTY_ID, + "shop-dict-1", + "https://merchant.example/shop-dict-1"); + } +} diff --git a/src/test/java/dev/vality/analytics/listener/handler/withdrawal/WithdrawalEventHandlerTest.java b/src/test/java/dev/vality/analytics/listener/handler/withdrawal/WithdrawalEventHandlerTest.java new file mode 100644 index 00000000..a04056d9 --- /dev/null +++ b/src/test/java/dev/vality/analytics/listener/handler/withdrawal/WithdrawalEventHandlerTest.java @@ -0,0 +1,199 @@ +package dev.vality.analytics.listener.handler.withdrawal; + +import dev.vality.analytics.computer.WithdrawalCashFlowComputer; +import dev.vality.analytics.dao.model.WithdrawalRow; +import dev.vality.analytics.dao.model.WithdrawalStateSnapshot; +import dev.vality.analytics.dao.repository.clickhouse.ClickHouseWithdrawalRepository; +import dev.vality.analytics.dao.repository.postgres.PostgresWithdrawalStateRepository; +import dev.vality.analytics.listener.mapper.withdrawal.WithdrawalCreatedMapper; +import dev.vality.analytics.listener.mapper.withdrawal.WithdrawalRouteMapper; +import dev.vality.analytics.listener.mapper.withdrawal.WithdrawalStatusMapper; +import dev.vality.analytics.listener.mapper.withdrawal.WithdrawalTransferMapper; +import dev.vality.analytics.utils.WithdrawalEventTestUtils; +import dev.vality.fistful.withdrawal.TimestampedChange; +import dev.vality.machinegun.eventsink.MachineEvent; +import dev.vality.sink.common.parser.impl.MachineEventParser; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class WithdrawalEventHandlerTest { + + @Mock + private MachineEventParser machineEventParser; + @Mock + private PostgresWithdrawalStateRepository postgresWithdrawalStateRepository; + @Mock + private ClickHouseWithdrawalRepository clickHouseWithdrawalRepository; + + private WithdrawalEventHandler withdrawalEventHandler; + private Map store; + + @BeforeEach + public void setUp() { + withdrawalEventHandler = new WithdrawalEventHandler( + machineEventParser, + postgresWithdrawalStateRepository, + clickHouseWithdrawalRepository, + List.of( + new WithdrawalCreatedMapper(), + new WithdrawalRouteMapper(), + new WithdrawalTransferMapper(new WithdrawalCashFlowComputer()), + new WithdrawalStatusMapper())); + store = new HashMap<>(); + + when(postgresWithdrawalStateRepository.findByWithdrawalId(anyString())) + .thenAnswer(invocation -> Optional.ofNullable(store.get(invocation.getArgument(0)))); + doAnswer(invocation -> { + WithdrawalStateSnapshot snapshot = invocation.getArgument(0); + store.put(snapshot.getWithdrawalId(), snapshot); + return null; + }).when(postgresWithdrawalStateRepository).upsert(any(WithdrawalStateSnapshot.class)); + } + + @Test + public void createdShouldInitializeReducerState() { + MachineEvent machineEvent = WithdrawalEventTestUtils.machineEvent(1L, + WithdrawalEventTestUtils.createdChange(1500L, 42, 24)); + when(machineEventParser.parse(machineEvent)) + .thenReturn(WithdrawalEventTestUtils.createdChange(1500L, 42, 24)); + + withdrawalEventHandler.handle(List.of(machineEvent)); + + WithdrawalStateSnapshot snapshot = store.get(WithdrawalEventTestUtils.WITHDRAWAL_ID); + assertEquals(WithdrawalEventTestUtils.PARTY_ID, snapshot.getPartyId()); + assertEquals(WithdrawalEventTestUtils.WALLET_ID, snapshot.getWalletId()); + assertEquals(WithdrawalEventTestUtils.DESTINATION_ID, snapshot.getDestinationId()); + assertEquals(WithdrawalEventTestUtils.CURRENCY, snapshot.getCurrency()); + assertEquals(1500L, snapshot.getRequestedAmount()); + assertEquals("42", snapshot.getProviderId()); + assertEquals("24", snapshot.getTerminal()); + } + + @Test + public void routeShouldUpdateProviderAndTerminal() { + store.put(WithdrawalEventTestUtils.WITHDRAWAL_ID, WithdrawalStateSnapshot.builder() + .withdrawalId(WithdrawalEventTestUtils.WITHDRAWAL_ID) + .partyId(WithdrawalEventTestUtils.PARTY_ID) + .lastSequenceId(1L) + .build()); + + MachineEvent machineEvent = WithdrawalEventTestUtils.machineEvent(2L, WithdrawalEventTestUtils.routeChange(77, 55)); + when(machineEventParser.parse(machineEvent)).thenReturn(WithdrawalEventTestUtils.routeChange(77, 55)); + + withdrawalEventHandler.handle(List.of(machineEvent)); + + WithdrawalStateSnapshot snapshot = store.get(WithdrawalEventTestUtils.WITHDRAWAL_ID); + assertEquals("77", snapshot.getProviderId()); + assertEquals("55", snapshot.getTerminal()); + assertEquals(2L, snapshot.getLastSequenceId()); + } + + @Test + public void transferCreatedShouldUpdateMonetaryFields() { + store.put(WithdrawalEventTestUtils.WITHDRAWAL_ID, WithdrawalStateSnapshot.builder() + .withdrawalId(WithdrawalEventTestUtils.WITHDRAWAL_ID) + .partyId(WithdrawalEventTestUtils.PARTY_ID) + .lastSequenceId(1L) + .build()); + + MachineEvent machineEvent = WithdrawalEventTestUtils.machineEvent(3L, + WithdrawalEventTestUtils.transferCreatedChange(1200L, 100L, 20L, 10L)); + when(machineEventParser.parse(machineEvent)) + .thenReturn(WithdrawalEventTestUtils.transferCreatedChange(1200L, 100L, 20L, 10L)); + + withdrawalEventHandler.handle(List.of(machineEvent)); + + WithdrawalStateSnapshot snapshot = store.get(WithdrawalEventTestUtils.WITHDRAWAL_ID); + assertEquals(1200L, snapshot.getAmount()); + assertEquals(100L, snapshot.getSystemFee()); + assertEquals(20L, snapshot.getProviderFee()); + assertEquals(10L, snapshot.getExternalFee()); + } + + @Test + public void statusChangedShouldWriteSnapshotRowUsingCurrentState() { + store.put(WithdrawalEventTestUtils.WITHDRAWAL_ID, WithdrawalStateSnapshot.builder() + .withdrawalId(WithdrawalEventTestUtils.WITHDRAWAL_ID) + .partyId(WithdrawalEventTestUtils.PARTY_ID) + .walletId(WithdrawalEventTestUtils.WALLET_ID) + .destinationId(WithdrawalEventTestUtils.DESTINATION_ID) + .currency(WithdrawalEventTestUtils.CURRENCY) + .withdrawalCreatedAt(java.time.LocalDateTime.parse("2024-01-10T10:15:30")) + .providerId("42") + .terminal("24") + .amount(1200L) + .systemFee(100L) + .providerFee(20L) + .externalFee(10L) + .lastSequenceId(3L) + .build()); + + MachineEvent machineEvent = WithdrawalEventTestUtils.machineEvent(4L, WithdrawalEventTestUtils.succeededStatusChange()); + when(machineEventParser.parse(machineEvent)).thenReturn(WithdrawalEventTestUtils.succeededStatusChange()); + + withdrawalEventHandler.handle(List.of(machineEvent)); + + ArgumentCaptor> rowsCaptor = ArgumentCaptor.forClass(List.class); + verify(clickHouseWithdrawalRepository).insertBatch(rowsCaptor.capture()); + List rows = rowsCaptor.getValue(); + assertThat(rows, hasSize(1)); + WithdrawalRow row = rows.get(0); + assertThat(row.getPartyId(), is(WithdrawalEventTestUtils.PARTY_ID)); + assertThat(row.getCurrency(), is(WithdrawalEventTestUtils.CURRENCY)); + assertThat(row.getProviderId(), is("42")); + assertThat(row.getTerminal(), is("24")); + assertThat(row.getAmount(), is(1200L)); + assertThat(row.getSystemFee(), is(100L)); + assertThat(row.getProviderFee(), is(20L)); + assertThat(row.getExternalFee(), is(10L)); + assertThat(row.getStatus().name(), is("succeeded")); + } + + @Test + public void statusChangedShouldFallbackToRequestedAmountWhenTransferMissing() { + store.put(WithdrawalEventTestUtils.WITHDRAWAL_ID, WithdrawalStateSnapshot.builder() + .withdrawalId(WithdrawalEventTestUtils.WITHDRAWAL_ID) + .partyId(WithdrawalEventTestUtils.PARTY_ID) + .walletId(WithdrawalEventTestUtils.WALLET_ID) + .destinationId(WithdrawalEventTestUtils.DESTINATION_ID) + .currency(WithdrawalEventTestUtils.CURRENCY) + .withdrawalCreatedAt(java.time.LocalDateTime.parse("2024-01-10T10:15:30")) + .requestedAmount(1500L) + .lastSequenceId(1L) + .build()); + + MachineEvent machineEvent = WithdrawalEventTestUtils.machineEvent(2L, WithdrawalEventTestUtils.pendingStatusChange()); + when(machineEventParser.parse(machineEvent)).thenReturn(WithdrawalEventTestUtils.pendingStatusChange()); + + withdrawalEventHandler.handle(List.of(machineEvent)); + + ArgumentCaptor> rowsCaptor = ArgumentCaptor.forClass(List.class); + verify(clickHouseWithdrawalRepository).insertBatch(rowsCaptor.capture()); + WithdrawalRow row = rowsCaptor.getValue().get(0); + assertThat(row.getAmount(), is(1500L)); + assertThat(row.getSystemFee(), is(0L)); + assertThat(row.getProviderFee(), is(0L)); + assertThat(row.getExternalFee(), is(0L)); + assertThat(row.getStatus().name(), is("pending")); + } +} diff --git a/src/test/java/dev/vality/analytics/utils/WithdrawalEventTestUtils.java b/src/test/java/dev/vality/analytics/utils/WithdrawalEventTestUtils.java new file mode 100644 index 00000000..cf5dcce6 --- /dev/null +++ b/src/test/java/dev/vality/analytics/utils/WithdrawalEventTestUtils.java @@ -0,0 +1,174 @@ +package dev.vality.analytics.utils; + +import dev.vality.fistful.base.Cash; +import dev.vality.fistful.base.CurrencyRef; +import dev.vality.fistful.cashflow.CashFlowAccount; +import dev.vality.fistful.cashflow.ExternalCashFlowAccount; +import dev.vality.fistful.cashflow.FinalCashFlow; +import dev.vality.fistful.cashflow.FinalCashFlowAccount; +import dev.vality.fistful.cashflow.FinalCashFlowPosting; +import dev.vality.fistful.cashflow.MerchantCashFlowAccount; +import dev.vality.fistful.cashflow.ProviderCashFlowAccount; +import dev.vality.fistful.cashflow.SystemCashFlowAccount; +import dev.vality.fistful.transfer.Transfer; +import dev.vality.fistful.withdrawal.Change; +import dev.vality.fistful.withdrawal.CreatedChange; +import dev.vality.fistful.withdrawal.Route; +import dev.vality.fistful.withdrawal.RouteChange; +import dev.vality.fistful.withdrawal.StatusChange; +import dev.vality.fistful.withdrawal.TimestampedChange; +import dev.vality.fistful.withdrawal.TransferChange; +import dev.vality.fistful.withdrawal.Withdrawal; +import dev.vality.fistful.withdrawal.status.Pending; +import dev.vality.fistful.withdrawal.status.Status; +import dev.vality.fistful.withdrawal.status.Succeeded; +import dev.vality.geck.common.util.TypeUtil; +import dev.vality.geck.serializer.Geck; +import dev.vality.machinegun.eventsink.MachineEvent; +import dev.vality.machinegun.eventsink.SinkEvent; +import org.apache.thrift.TBase; + +import java.time.Instant; +import java.util.List; + +public final class WithdrawalEventTestUtils { + + public static final String WITHDRAWAL_ID = "withdrawal-1"; + public static final String PARTY_ID = "party-1"; + public static final String WALLET_ID = "wallet-1"; + public static final String DESTINATION_ID = "destination-1"; + public static final String CURRENCY = "RUB"; + public static final String CREATED_AT = "2024-01-10T10:15:30Z"; + public static final String OCCURRED_AT = "2024-01-10T10:15:35Z"; + + private WithdrawalEventTestUtils() { + } + + public static TimestampedChange createdChange(long requestedAmount, Integer providerId, Integer terminalId) { + Withdrawal withdrawal = new Withdrawal() + .setId(WITHDRAWAL_ID) + .setPartyId(PARTY_ID) + .setWalletId(WALLET_ID) + .setDestinationId(DESTINATION_ID) + .setBody(new Cash().setAmount(requestedAmount).setCurrency(new CurrencyRef(CURRENCY))) + .setCreatedAt(CREATED_AT) + .setDomainRevision(1L); + + if (providerId != null) { + Route route = new Route().setProviderId(providerId); + if (terminalId != null) { + route.setTerminalId(terminalId); + } + withdrawal.setRoute(route); + } + + return new TimestampedChange() + .setOccuredAt(OCCURRED_AT) + .setChange(Change.created(new CreatedChange().setWithdrawal(withdrawal))); + } + + public static TimestampedChange routeChange(int providerId, Integer terminalId) { + Route route = new Route().setProviderId(providerId); + if (terminalId != null) { + route.setTerminalId(terminalId); + } + + return new TimestampedChange() + .setOccuredAt(OCCURRED_AT) + .setChange(Change.route(new RouteChange().setRoute(route))); + } + + public static TimestampedChange transferCreatedChange(long amount, long systemFee, long providerFee, long externalFee) { + FinalCashFlow cashFlow = new FinalCashFlow().setPostings(List.of( + merchantToPayout(amount), + merchantToSystem(systemFee), + systemToProvider(providerFee), + systemToExternal(externalFee), + unrelatedPosting(999L))); + + Transfer transfer = new Transfer() + .setId("transfer-1") + .setCashflow(cashFlow); + + return new TimestampedChange() + .setOccuredAt(OCCURRED_AT) + .setChange(Change.transfer(new TransferChange().setPayload( + dev.vality.fistful.transfer.Change.created( + new dev.vality.fistful.transfer.CreatedChange().setTransfer(transfer))))); + } + + public static TimestampedChange pendingStatusChange() { + return new TimestampedChange() + .setOccuredAt(OCCURRED_AT) + .setChange(Change.status_changed(new StatusChange().setStatus(Status.pending(new Pending())))); + } + + public static TimestampedChange succeededStatusChange() { + return new TimestampedChange() + .setOccuredAt(OCCURRED_AT) + .setChange(Change.status_changed(new StatusChange().setStatus(Status.succeeded(new Succeeded())))); + } + + public static MachineEvent machineEvent(long eventId, TimestampedChange timestampedChange) { + return new MachineEvent() + .setSourceNs("withdrawal") + .setSourceId(WITHDRAWAL_ID) + .setEventId(eventId) + .setCreatedAt(TypeUtil.temporalToString(Instant.parse(OCCURRED_AT))) + .setData(dev.vality.machinegun.msgpack.Value.bin(Geck.toMsgPack(timestampedChange))); + } + + public static SinkEvent sinkEvent(long eventId, TimestampedChange timestampedChange) { + return SinkEvent.event(machineEvent(eventId, timestampedChange)); + } + + public static List> fullSuccessFlow() { + return List.of( + sinkEvent(1L, createdChange(1500L, null, null)), + sinkEvent(2L, routeChange(42, 24)), + sinkEvent(3L, transferCreatedChange(1200L, 100L, 20L, 10L)), + sinkEvent(4L, succeededStatusChange())); + } + + public static FinalCashFlowPosting merchantToPayout(long amount) { + return posting( + CashFlowAccount.merchant(MerchantCashFlowAccount.settlement), + CashFlowAccount.merchant(MerchantCashFlowAccount.payout), + amount); + } + + public static FinalCashFlowPosting merchantToSystem(long amount) { + return posting( + CashFlowAccount.merchant(MerchantCashFlowAccount.settlement), + CashFlowAccount.system(SystemCashFlowAccount.settlement), + amount); + } + + public static FinalCashFlowPosting systemToProvider(long amount) { + return posting( + CashFlowAccount.system(SystemCashFlowAccount.settlement), + CashFlowAccount.provider(ProviderCashFlowAccount.settlement), + amount); + } + + public static FinalCashFlowPosting systemToExternal(long amount) { + return posting( + CashFlowAccount.system(SystemCashFlowAccount.settlement), + CashFlowAccount.external(ExternalCashFlowAccount.outcome), + amount); + } + + public static FinalCashFlowPosting unrelatedPosting(long amount) { + return posting( + CashFlowAccount.provider(ProviderCashFlowAccount.settlement), + CashFlowAccount.system(SystemCashFlowAccount.settlement), + amount); + } + + private static FinalCashFlowPosting posting(CashFlowAccount source, CashFlowAccount destination, long amount) { + return new FinalCashFlowPosting() + .setSource(new FinalCashFlowAccount().setAccountType(source)) + .setDestination(new FinalCashFlowAccount().setAccountType(destination)) + .setVolume(new Cash().setAmount(amount).setCurrency(new CurrencyRef(CURRENCY))); + } +} diff --git a/src/test/resources/application.properties b/src/test/resources/application.properties index 9bfd0ae6..3374f194 100644 --- a/src/test/resources/application.properties +++ b/src/test/resources/application.properties @@ -1 +1,2 @@ clickhouse.flyway.enabled=false +kafka.topic.withdrawal.initial=mg-events-withdrawal From dc40db9c518df4abfa86825dbc41a0c726acc686 Mon Sep 17 00:00:00 2001 From: vitaxa Date: Wed, 4 Mar 2026 16:04:08 +0300 Subject: [PATCH 2/4] fix checkstyle --- ...ithdrawalBatchPreparedStatementSetter.java | 14 ++++-- .../PostgresWithdrawalStateRepository.java | 49 ++++++++++--------- .../WithdrawalEventHandlerTest.java | 12 +++-- .../utils/WithdrawalEventTestUtils.java | 6 ++- 4 files changed, 49 insertions(+), 32 deletions(-) diff --git a/src/main/java/dev/vality/analytics/dao/repository/clickhouse/ClickHouseWithdrawalBatchPreparedStatementSetter.java b/src/main/java/dev/vality/analytics/dao/repository/clickhouse/ClickHouseWithdrawalBatchPreparedStatementSetter.java index 3fa4ce32..74d63810 100644 --- a/src/main/java/dev/vality/analytics/dao/repository/clickhouse/ClickHouseWithdrawalBatchPreparedStatementSetter.java +++ b/src/main/java/dev/vality/analytics/dao/repository/clickhouse/ClickHouseWithdrawalBatchPreparedStatementSetter.java @@ -12,10 +12,12 @@ public class ClickHouseWithdrawalBatchPreparedStatementSetter implements BatchPreparedStatementSetter { - public static final String INSERT = "INSERT INTO analytic.events_sink_withdrawal " - + "(timestamp, eventTime, eventTimeHour, partyId, withdrawalId, sequenceId, withdrawalTime, walletId, " - + "destinationId, providerId, terminal, amount, guaranteeDeposit, systemFee, providerFee, externalFee, " - + "currency, status) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + public static final String INSERT = "INSERT INTO analytic.events_sink_withdrawal " + + "(timestamp, eventTime, eventTimeHour, partyId, withdrawalId, sequenceId, withdrawalTime, " + + "walletId, " + + "destinationId, providerId, terminal, amount, guaranteeDeposit, systemFee, providerFee, " + + "externalFee, " + + "currency, status) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; private final List batch; @@ -29,7 +31,9 @@ public void setValues(PreparedStatement ps, int i) throws SQLException { int column = 1; ps.setObject(column++, row.getEventTime().toLocalDate()); ps.setLong(column++, row.getEventTime().toEpochSecond(ZoneOffset.UTC)); - ps.setLong(column++, row.getEventTime().toInstant(ZoneOffset.UTC).truncatedTo(ChronoUnit.HOURS).toEpochMilli()); + ps.setLong( + column++, + row.getEventTime().toInstant(ZoneOffset.UTC).truncatedTo(ChronoUnit.HOURS).toEpochMilli()); ps.setString(column++, defaultString(row.getPartyId())); ps.setString(column++, defaultString(row.getWithdrawalId())); ps.setLong(column++, row.getSequenceId()); diff --git a/src/main/java/dev/vality/analytics/dao/repository/postgres/PostgresWithdrawalStateRepository.java b/src/main/java/dev/vality/analytics/dao/repository/postgres/PostgresWithdrawalStateRepository.java index aa16cae0..0a382266 100644 --- a/src/main/java/dev/vality/analytics/dao/repository/postgres/PostgresWithdrawalStateRepository.java +++ b/src/main/java/dev/vality/analytics/dao/repository/postgres/PostgresWithdrawalStateRepository.java @@ -15,29 +15,32 @@ @RequiredArgsConstructor public class PostgresWithdrawalStateRepository { - private static final String UPSERT = "INSERT INTO analytics.withdrawal_state " - + "(withdrawal_id, party_id, wallet_id, destination_id, currency, requested_amount, amount, system_fee, " - + "provider_fee, external_fee, withdrawal_created_at, provider_id, terminal, last_sequence_id, updated_at) " - + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) " - + "ON CONFLICT (withdrawal_id) DO UPDATE SET " - + "party_id = EXCLUDED.party_id, " - + "wallet_id = EXCLUDED.wallet_id, " - + "destination_id = EXCLUDED.destination_id, " - + "currency = EXCLUDED.currency, " - + "requested_amount = EXCLUDED.requested_amount, " - + "amount = EXCLUDED.amount, " - + "system_fee = EXCLUDED.system_fee, " - + "provider_fee = EXCLUDED.provider_fee, " - + "external_fee = EXCLUDED.external_fee, " - + "withdrawal_created_at = EXCLUDED.withdrawal_created_at, " - + "provider_id = EXCLUDED.provider_id, " - + "terminal = EXCLUDED.terminal, " - + "last_sequence_id = EXCLUDED.last_sequence_id, " - + "updated_at = EXCLUDED.updated_at"; - private static final String SELECT = "SELECT withdrawal_id, party_id, wallet_id, destination_id, currency, " - + "requested_amount, amount, system_fee, provider_fee, external_fee, withdrawal_created_at, provider_id, " - + "terminal, last_sequence_id, updated_at " - + "FROM analytics.withdrawal_state WHERE withdrawal_id = ?"; + private static final String UPSERT = "INSERT INTO analytics.withdrawal_state " + + "(withdrawal_id, party_id, wallet_id, destination_id, currency, requested_amount, amount, " + + "system_fee, " + + "provider_fee, external_fee, withdrawal_created_at, provider_id, terminal, " + + "last_sequence_id, " + + "updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) " + + "ON CONFLICT (withdrawal_id) DO UPDATE SET " + + "party_id = EXCLUDED.party_id, " + + "wallet_id = EXCLUDED.wallet_id, " + + "destination_id = EXCLUDED.destination_id, " + + "currency = EXCLUDED.currency, " + + "requested_amount = EXCLUDED.requested_amount, " + + "amount = EXCLUDED.amount, " + + "system_fee = EXCLUDED.system_fee, " + + "provider_fee = EXCLUDED.provider_fee, " + + "external_fee = EXCLUDED.external_fee, " + + "withdrawal_created_at = EXCLUDED.withdrawal_created_at, " + + "provider_id = EXCLUDED.provider_id, " + + "terminal = EXCLUDED.terminal, " + + "last_sequence_id = EXCLUDED.last_sequence_id, " + + "updated_at = EXCLUDED.updated_at"; + private static final String SELECT = "SELECT withdrawal_id, party_id, wallet_id, destination_id, currency, " + + "requested_amount, amount, system_fee, provider_fee, external_fee, withdrawal_created_at, " + + "provider_id, terminal, last_sequence_id, updated_at " + + "FROM analytics.withdrawal_state " + + "WHERE withdrawal_id = ?"; private final JdbcTemplate postgresJdbcTemplate; diff --git a/src/test/java/dev/vality/analytics/listener/handler/withdrawal/WithdrawalEventHandlerTest.java b/src/test/java/dev/vality/analytics/listener/handler/withdrawal/WithdrawalEventHandlerTest.java index a04056d9..9e95dc62 100644 --- a/src/test/java/dev/vality/analytics/listener/handler/withdrawal/WithdrawalEventHandlerTest.java +++ b/src/test/java/dev/vality/analytics/listener/handler/withdrawal/WithdrawalEventHandlerTest.java @@ -97,7 +97,9 @@ public void routeShouldUpdateProviderAndTerminal() { .lastSequenceId(1L) .build()); - MachineEvent machineEvent = WithdrawalEventTestUtils.machineEvent(2L, WithdrawalEventTestUtils.routeChange(77, 55)); + MachineEvent machineEvent = WithdrawalEventTestUtils.machineEvent( + 2L, + WithdrawalEventTestUtils.routeChange(77, 55)); when(machineEventParser.parse(machineEvent)).thenReturn(WithdrawalEventTestUtils.routeChange(77, 55)); withdrawalEventHandler.handle(List.of(machineEvent)); @@ -148,7 +150,9 @@ public void statusChangedShouldWriteSnapshotRowUsingCurrentState() { .lastSequenceId(3L) .build()); - MachineEvent machineEvent = WithdrawalEventTestUtils.machineEvent(4L, WithdrawalEventTestUtils.succeededStatusChange()); + MachineEvent machineEvent = WithdrawalEventTestUtils.machineEvent( + 4L, + WithdrawalEventTestUtils.succeededStatusChange()); when(machineEventParser.parse(machineEvent)).thenReturn(WithdrawalEventTestUtils.succeededStatusChange()); withdrawalEventHandler.handle(List.of(machineEvent)); @@ -182,7 +186,9 @@ public void statusChangedShouldFallbackToRequestedAmountWhenTransferMissing() { .lastSequenceId(1L) .build()); - MachineEvent machineEvent = WithdrawalEventTestUtils.machineEvent(2L, WithdrawalEventTestUtils.pendingStatusChange()); + MachineEvent machineEvent = WithdrawalEventTestUtils.machineEvent( + 2L, + WithdrawalEventTestUtils.pendingStatusChange()); when(machineEventParser.parse(machineEvent)).thenReturn(WithdrawalEventTestUtils.pendingStatusChange()); withdrawalEventHandler.handle(List.of(machineEvent)); diff --git a/src/test/java/dev/vality/analytics/utils/WithdrawalEventTestUtils.java b/src/test/java/dev/vality/analytics/utils/WithdrawalEventTestUtils.java index cf5dcce6..8d72961a 100644 --- a/src/test/java/dev/vality/analytics/utils/WithdrawalEventTestUtils.java +++ b/src/test/java/dev/vality/analytics/utils/WithdrawalEventTestUtils.java @@ -78,7 +78,11 @@ public static TimestampedChange routeChange(int providerId, Integer terminalId) .setChange(Change.route(new RouteChange().setRoute(route))); } - public static TimestampedChange transferCreatedChange(long amount, long systemFee, long providerFee, long externalFee) { + public static TimestampedChange transferCreatedChange( + long amount, + long systemFee, + long providerFee, + long externalFee) { FinalCashFlow cashFlow = new FinalCashFlow().setPostings(List.of( merchantToPayout(amount), merchantToSystem(systemFee), From 4942b1722bb8aa548b7bbe0a5c9d32ce570f6ba5 Mon Sep 17 00:00:00 2001 From: vitaxa Date: Wed, 4 Mar 2026 16:18:51 +0300 Subject: [PATCH 3/4] fix withdrawal id --- .../model/AcceptanceDisbursementRawRow.java | 22 ------------------- .../withdrawal/WithdrawalEventHandler.java | 9 ++------ 2 files changed, 2 insertions(+), 29 deletions(-) delete mode 100644 src/main/java/dev/vality/analytics/dao/model/AcceptanceDisbursementRawRow.java diff --git a/src/main/java/dev/vality/analytics/dao/model/AcceptanceDisbursementRawRow.java b/src/main/java/dev/vality/analytics/dao/model/AcceptanceDisbursementRawRow.java deleted file mode 100644 index cff3f7c7..00000000 --- a/src/main/java/dev/vality/analytics/dao/model/AcceptanceDisbursementRawRow.java +++ /dev/null @@ -1,22 +0,0 @@ -package dev.vality.analytics.dao.model; - -import lombok.AllArgsConstructor; -import lombok.Builder; -import lombok.Data; -import lombok.NoArgsConstructor; - -import java.time.LocalDate; - -@Data -@Builder -@NoArgsConstructor -@AllArgsConstructor -public class AcceptanceDisbursementRawRow { - - private LocalDate date; - private String currency; - private String locationUrl; - private long turnover; - private long cost; - -} diff --git a/src/main/java/dev/vality/analytics/listener/handler/withdrawal/WithdrawalEventHandler.java b/src/main/java/dev/vality/analytics/listener/handler/withdrawal/WithdrawalEventHandler.java index 0e67ec20..8334ad98 100644 --- a/src/main/java/dev/vality/analytics/listener/handler/withdrawal/WithdrawalEventHandler.java +++ b/src/main/java/dev/vality/analytics/listener/handler/withdrawal/WithdrawalEventHandler.java @@ -71,7 +71,7 @@ private WithdrawalEventContext prepareContext( return null; } - String withdrawalId = resolveWithdrawalId(machineEvent, timestampedChange); + String withdrawalId = resolveWithdrawalId(machineEvent); if (withdrawalId == null) { log.warn("Skipping withdrawal event without withdrawal id, eventId={}", machineEvent.getEventId()); return null; @@ -158,15 +158,10 @@ private TimestampedChange parse(MachineEvent machineEvent) { } } - private String resolveWithdrawalId(MachineEvent machineEvent, TimestampedChange timestampedChange) { + private String resolveWithdrawalId(MachineEvent machineEvent) { if (machineEvent.isSetSourceId()) { return machineEvent.getSourceId(); } - - if (timestampedChange.getChange().isSetCreated()) { - return timestampedChange.getChange().getCreated().getWithdrawal().getId(); - } - return null; } From 7c871e3bd1f50d6a3587590a07dbe0a85f9b9165 Mon Sep 17 00:00:00 2001 From: vitaxa Date: Thu, 5 Mar 2026 14:42:12 +0300 Subject: [PATCH 4/4] review fix --- .../computer/WithdrawalCashFlowComputer.java | 104 +++++++++--------- .../analytics/dao/model/WithdrawalRow.java | 2 - .../dao/model/WithdrawalStateSnapshot.java | 1 - ...ithdrawalBatchPreparedStatementSetter.java | 7 +- .../PostgresWithdrawalStateRepository.java | 9 +- .../domain/WithdrawalCashFlowResult.java | 15 +++ .../withdrawal/WithdrawalEventHandler.java | 46 +++----- .../mapper/rate/CurrencyEventMapper.java | 11 +- .../withdrawal/AbstractWithdrawalMapper.java | 12 +- .../withdrawal/WithdrawalStatusMapper.java | 2 - .../withdrawal/WithdrawalTransferMapper.java | 5 +- .../vality/analytics/utils/TimestampUtil.java | 23 ++++ .../V4__create_withdrawal_table.sql | 4 +- .../sharded/V4__create_withdrawal_table.sql | 4 +- .../migration/V21__add_withdrawal_state.sql | 1 - .../WithdrawalCashFlowComputerTest.java | 29 +++-- .../listener/WithdrawalListenerTest.java | 4 +- .../WithdrawalEventHandlerTest.java | 8 +- .../utils/WithdrawalEventTestUtils.java | 22 ++-- 19 files changed, 160 insertions(+), 149 deletions(-) create mode 100644 src/main/java/dev/vality/analytics/domain/WithdrawalCashFlowResult.java create mode 100644 src/main/java/dev/vality/analytics/utils/TimestampUtil.java diff --git a/src/main/java/dev/vality/analytics/computer/WithdrawalCashFlowComputer.java b/src/main/java/dev/vality/analytics/computer/WithdrawalCashFlowComputer.java index 23009005..5d10e80d 100644 --- a/src/main/java/dev/vality/analytics/computer/WithdrawalCashFlowComputer.java +++ b/src/main/java/dev/vality/analytics/computer/WithdrawalCashFlowComputer.java @@ -1,86 +1,92 @@ package dev.vality.analytics.computer; -import dev.vality.analytics.domain.CashFlowResult; -import dev.vality.fistful.cashflow.ExternalCashFlowAccount; +import dev.vality.analytics.domain.WithdrawalCashFlowResult; +import dev.vality.fistful.cashflow.CashFlowAccount; import dev.vality.fistful.cashflow.FinalCashFlowPosting; import dev.vality.fistful.cashflow.MerchantCashFlowAccount; import dev.vality.fistful.cashflow.ProviderCashFlowAccount; import dev.vality.fistful.cashflow.SystemCashFlowAccount; +import dev.vality.fistful.cashflow.WalletCashFlowAccount; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; +import java.util.EnumMap; import java.util.List; @Service public class WithdrawalCashFlowComputer { - public CashFlowResult compute(List cashFlow) { - long amount = 0L; - long systemFee = 0L; - long providerFee = 0L; - long externalFee = 0L; + public WithdrawalCashFlowResult compute(List cashFlow) { + EnumMap cashFlowMap = new EnumMap<>(WithdrawalCashFlowType.class); if (CollectionUtils.isEmpty(cashFlow)) { - return CashFlowResult.EMPTY; + return WithdrawalCashFlowResult.EMPTY; } for (FinalCashFlowPosting posting : cashFlow) { - if (posting == null || !posting.isSetSource() || !posting.isSetDestination() || !posting.isSetVolume()) { + if (posting == null || !posting.isSetSource() || !posting.isSetDestination() || !posting.isSetVolume() + || !posting.getSource().isSetAccountType() || !posting.getDestination().isSetAccountType()) { continue; } - if (isAmount(posting)) { - amount += posting.getVolume().getAmount(); - } + WithdrawalCashFlowType type = getCashFlowType( + posting.getSource().getAccountType(), + posting.getDestination().getAccountType()); + cashFlowMap.put(type, posting.getVolume().getAmount()); + } - if (isSystemFee(posting)) { - systemFee += posting.getVolume().getAmount(); - } + return WithdrawalCashFlowResult.builder() + .amount(cashFlowMap.getOrDefault(WithdrawalCashFlowType.AMOUNT, 0L)) + .systemFee(cashFlowMap.getOrDefault(WithdrawalCashFlowType.FEE, 0L)) + .providerFee(cashFlowMap.getOrDefault(WithdrawalCashFlowType.PROVIDER_FEE, 0L)) + .build(); + } - if (isProviderFee(posting)) { - providerFee += posting.getVolume().getAmount(); - } + private WithdrawalCashFlowType getCashFlowType(CashFlowAccount source, CashFlowAccount destination) { + if (isWalletSenderSettlement(source) && isWalletReceiverDestination(destination)) { + return WithdrawalCashFlowType.AMOUNT; + } - if (isExternalFee(posting)) { - externalFee += posting.getVolume().getAmount(); - } + if (isWalletSenderSettlement(source) && isSystemSettlement(destination)) { + return WithdrawalCashFlowType.FEE; } - return CashFlowResult.builder() - .amount(amount) - .guaranteeDeposit(0L) - .systemFee(systemFee) - .providerFee(providerFee) - .externalFee(externalFee) - .build(); + if (isSystemSettlement(source) && isProviderSettlement(destination)) { + return WithdrawalCashFlowType.PROVIDER_FEE; + } + + if (isMerchantSettlement(source) && isProviderSettlement(destination)) { + return WithdrawalCashFlowType.REFUND_AMOUNT; + } + + return WithdrawalCashFlowType.UNKNOWN; + } + + private boolean isWalletSenderSettlement(CashFlowAccount account) { + return account.isSetWallet() && account.getWallet() == WalletCashFlowAccount.sender_settlement; + } + + private boolean isWalletReceiverDestination(CashFlowAccount account) { + return account.isSetWallet() && account.getWallet() == WalletCashFlowAccount.receiver_destination; } - private boolean isAmount(FinalCashFlowPosting posting) { - return posting.getSource().getAccountType().isSetMerchant() - && posting.getSource().getAccountType().getMerchant() == MerchantCashFlowAccount.settlement - && posting.getDestination().getAccountType().isSetMerchant() - && posting.getDestination().getAccountType().getMerchant() == MerchantCashFlowAccount.payout; + private boolean isSystemSettlement(CashFlowAccount account) { + return account.isSetSystem() && account.getSystem() == SystemCashFlowAccount.settlement; } - private boolean isSystemFee(FinalCashFlowPosting posting) { - return posting.getSource().getAccountType().isSetMerchant() - && posting.getSource().getAccountType().getMerchant() == MerchantCashFlowAccount.settlement - && posting.getDestination().getAccountType().isSetSystem() - && posting.getDestination().getAccountType().getSystem() == SystemCashFlowAccount.settlement; + private boolean isProviderSettlement(CashFlowAccount account) { + return account.isSetProvider() && account.getProvider() == ProviderCashFlowAccount.settlement; } - private boolean isProviderFee(FinalCashFlowPosting posting) { - return posting.getSource().getAccountType().isSetSystem() - && posting.getSource().getAccountType().getSystem() == SystemCashFlowAccount.settlement - && posting.getDestination().getAccountType().isSetProvider() - && posting.getDestination().getAccountType().getProvider() == ProviderCashFlowAccount.settlement; + private boolean isMerchantSettlement(CashFlowAccount account) { + return account.isSetMerchant() && account.getMerchant() == MerchantCashFlowAccount.settlement; } - private boolean isExternalFee(FinalCashFlowPosting posting) { - return posting.getSource().getAccountType().isSetSystem() - && posting.getSource().getAccountType().getSystem() == SystemCashFlowAccount.settlement - && posting.getDestination().getAccountType().isSetExternal() - && (posting.getDestination().getAccountType().getExternal() == ExternalCashFlowAccount.income - || posting.getDestination().getAccountType().getExternal() == ExternalCashFlowAccount.outcome); + private enum WithdrawalCashFlowType { + AMOUNT, + FEE, + PROVIDER_FEE, + REFUND_AMOUNT, + UNKNOWN } } diff --git a/src/main/java/dev/vality/analytics/dao/model/WithdrawalRow.java b/src/main/java/dev/vality/analytics/dao/model/WithdrawalRow.java index 1345a97b..3c720ec8 100644 --- a/src/main/java/dev/vality/analytics/dao/model/WithdrawalRow.java +++ b/src/main/java/dev/vality/analytics/dao/model/WithdrawalRow.java @@ -24,10 +24,8 @@ public class WithdrawalRow { private String providerId; private String terminal; private long amount; - private long guaranteeDeposit; private long systemFee; private long providerFee; - private long externalFee; private String currency; private WithdrawalStatus status; diff --git a/src/main/java/dev/vality/analytics/dao/model/WithdrawalStateSnapshot.java b/src/main/java/dev/vality/analytics/dao/model/WithdrawalStateSnapshot.java index d26f2bc5..9bf1da0f 100644 --- a/src/main/java/dev/vality/analytics/dao/model/WithdrawalStateSnapshot.java +++ b/src/main/java/dev/vality/analytics/dao/model/WithdrawalStateSnapshot.java @@ -22,7 +22,6 @@ public class WithdrawalStateSnapshot { private Long amount; private Long systemFee; private Long providerFee; - private Long externalFee; private LocalDateTime withdrawalCreatedAt; private String providerId; private String terminal; diff --git a/src/main/java/dev/vality/analytics/dao/repository/clickhouse/ClickHouseWithdrawalBatchPreparedStatementSetter.java b/src/main/java/dev/vality/analytics/dao/repository/clickhouse/ClickHouseWithdrawalBatchPreparedStatementSetter.java index 74d63810..9058d818 100644 --- a/src/main/java/dev/vality/analytics/dao/repository/clickhouse/ClickHouseWithdrawalBatchPreparedStatementSetter.java +++ b/src/main/java/dev/vality/analytics/dao/repository/clickhouse/ClickHouseWithdrawalBatchPreparedStatementSetter.java @@ -15,9 +15,8 @@ public class ClickHouseWithdrawalBatchPreparedStatementSetter implements BatchPr public static final String INSERT = "INSERT INTO analytic.events_sink_withdrawal " + "(timestamp, eventTime, eventTimeHour, partyId, withdrawalId, sequenceId, withdrawalTime, " + "walletId, " + - "destinationId, providerId, terminal, amount, guaranteeDeposit, systemFee, providerFee, " + - "externalFee, " + - "currency, status) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + "destinationId, providerId, terminal, amount, systemFee, providerFee, " + + "currency, status) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; private final List batch; @@ -43,10 +42,8 @@ public void setValues(PreparedStatement ps, int i) throws SQLException { ps.setString(column++, defaultString(row.getProviderId())); ps.setString(column++, defaultString(row.getTerminal())); ps.setLong(column++, safeUnsigned(row.getAmount())); - ps.setLong(column++, safeUnsigned(row.getGuaranteeDeposit())); ps.setLong(column++, safeUnsigned(row.getSystemFee())); ps.setLong(column++, safeUnsigned(row.getProviderFee())); - ps.setLong(column++, safeUnsigned(row.getExternalFee())); ps.setString(column++, defaultString(row.getCurrency())); ps.setString(column, row.getStatus().name()); } diff --git a/src/main/java/dev/vality/analytics/dao/repository/postgres/PostgresWithdrawalStateRepository.java b/src/main/java/dev/vality/analytics/dao/repository/postgres/PostgresWithdrawalStateRepository.java index 0a382266..ce9880fc 100644 --- a/src/main/java/dev/vality/analytics/dao/repository/postgres/PostgresWithdrawalStateRepository.java +++ b/src/main/java/dev/vality/analytics/dao/repository/postgres/PostgresWithdrawalStateRepository.java @@ -18,9 +18,9 @@ public class PostgresWithdrawalStateRepository { private static final String UPSERT = "INSERT INTO analytics.withdrawal_state " + "(withdrawal_id, party_id, wallet_id, destination_id, currency, requested_amount, amount, " + "system_fee, " + - "provider_fee, external_fee, withdrawal_created_at, provider_id, terminal, " + + "provider_fee, withdrawal_created_at, provider_id, terminal, " + "last_sequence_id, " + - "updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) " + + "updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) " + "ON CONFLICT (withdrawal_id) DO UPDATE SET " + "party_id = EXCLUDED.party_id, " + "wallet_id = EXCLUDED.wallet_id, " + @@ -30,14 +30,13 @@ public class PostgresWithdrawalStateRepository { "amount = EXCLUDED.amount, " + "system_fee = EXCLUDED.system_fee, " + "provider_fee = EXCLUDED.provider_fee, " + - "external_fee = EXCLUDED.external_fee, " + "withdrawal_created_at = EXCLUDED.withdrawal_created_at, " + "provider_id = EXCLUDED.provider_id, " + "terminal = EXCLUDED.terminal, " + "last_sequence_id = EXCLUDED.last_sequence_id, " + "updated_at = EXCLUDED.updated_at"; private static final String SELECT = "SELECT withdrawal_id, party_id, wallet_id, destination_id, currency, " + - "requested_amount, amount, system_fee, provider_fee, external_fee, withdrawal_created_at, " + + "requested_amount, amount, system_fee, provider_fee, withdrawal_created_at, " + "provider_id, terminal, last_sequence_id, updated_at " + "FROM analytics.withdrawal_state " + "WHERE withdrawal_id = ?"; @@ -65,7 +64,6 @@ public void upsert(WithdrawalStateSnapshot snapshot) { snapshot.getAmount(), snapshot.getSystemFee(), snapshot.getProviderFee(), - snapshot.getExternalFee(), snapshot.getWithdrawalCreatedAt(), snapshot.getProviderId(), snapshot.getTerminal(), @@ -86,7 +84,6 @@ private WithdrawalStateSnapshot map(ResultSet resultSet) throws SQLException { .amount((Long) resultSet.getObject("amount")) .systemFee((Long) resultSet.getObject("system_fee")) .providerFee((Long) resultSet.getObject("provider_fee")) - .externalFee((Long) resultSet.getObject("external_fee")) .withdrawalCreatedAt(resultSet.getTimestamp("withdrawal_created_at") != null ? resultSet.getTimestamp("withdrawal_created_at").toLocalDateTime() : null) diff --git a/src/main/java/dev/vality/analytics/domain/WithdrawalCashFlowResult.java b/src/main/java/dev/vality/analytics/domain/WithdrawalCashFlowResult.java new file mode 100644 index 00000000..04e9520e --- /dev/null +++ b/src/main/java/dev/vality/analytics/domain/WithdrawalCashFlowResult.java @@ -0,0 +1,15 @@ +package dev.vality.analytics.domain; + +import lombok.Builder; +import lombok.Data; + +@Data +@Builder +public class WithdrawalCashFlowResult { + + public static final WithdrawalCashFlowResult EMPTY = new WithdrawalCashFlowResult(0L, 0L, 0L); + + private final long amount; + private final long systemFee; + private final long providerFee; +} diff --git a/src/main/java/dev/vality/analytics/listener/handler/withdrawal/WithdrawalEventHandler.java b/src/main/java/dev/vality/analytics/listener/handler/withdrawal/WithdrawalEventHandler.java index 8334ad98..cca2eaa8 100644 --- a/src/main/java/dev/vality/analytics/listener/handler/withdrawal/WithdrawalEventHandler.java +++ b/src/main/java/dev/vality/analytics/listener/handler/withdrawal/WithdrawalEventHandler.java @@ -5,8 +5,8 @@ import dev.vality.analytics.dao.repository.clickhouse.ClickHouseWithdrawalRepository; import dev.vality.analytics.dao.repository.postgres.PostgresWithdrawalStateRepository; import dev.vality.analytics.listener.mapper.withdrawal.WithdrawalMapper; +import dev.vality.analytics.utils.TimestampUtil; import dev.vality.fistful.withdrawal.TimestampedChange; -import dev.vality.geck.common.util.TypeUtil; import dev.vality.machinegun.eventsink.MachineEvent; import dev.vality.sink.common.parser.impl.MachineEventParser; import lombok.RequiredArgsConstructor; @@ -50,7 +50,7 @@ public void handle(List batch) { continue; } - apply(result, stateCache, withdrawalRows); + applyMappingResult(result, stateCache, withdrawalRows); } clickHouseWithdrawalRepository.insertBatch(withdrawalRows); @@ -64,7 +64,7 @@ private WithdrawalEventContext prepareContext( return null; } - LocalDateTime eventTime = parseTime(timestampedChange.getOccuredAt()); + LocalDateTime eventTime = TimestampUtil.parseLocalDateTime(timestampedChange.getOccuredAt()); if (eventTime == null) { log.warn("Skipping withdrawal event with invalid occured_at, sourceId={}, eventId={}", machineEvent.getSourceId(), machineEvent.getEventId()); @@ -98,33 +98,22 @@ private WithdrawalMappingResult map(WithdrawalEventContext context) { if (mapper.accept(context.getTimestampedChange())) { WithdrawalMappingResult result = mapper.map(context.getTimestampedChange(), context); if (result == null) { - if (context.getCurrentState() == null) { - log.warn("Skipping {} change without reducer state, withdrawalId={}, eventId={}", - mapper.getChangeType(), - context.getWithdrawalId(), - context.getMachineEvent().getEventId()); - } else { - log.debug("Skipping {} change because mapper produced no update, withdrawalId={}, eventId={}", - mapper.getChangeType(), - context.getWithdrawalId(), - context.getMachineEvent().getEventId()); - } + logEmptyContextResult(context, mapper); } return result; } } - log.debug("No withdrawal mapper matched, withdrawalId={}, eventId={}", context.getWithdrawalId(), context.getMachineEvent().getEventId()); return null; } - private void apply( + private void applyMappingResult( WithdrawalMappingResult result, Map stateCache, List withdrawalRows) { if (result.getStateSnapshot() != null) { - upsert(result.getStateSnapshot(), stateCache); + cachedUpsert(result.getStateSnapshot(), stateCache); } if (result.getWithdrawalRow() != null) { @@ -132,7 +121,7 @@ private void apply( } } - private void upsert(WithdrawalStateSnapshot snapshot, Map stateCache) { + private void cachedUpsert(WithdrawalStateSnapshot snapshot, Map stateCache) { postgresWithdrawalStateRepository.upsert(snapshot); stateCache.put(snapshot.getWithdrawalId(), snapshot); } @@ -165,16 +154,17 @@ private String resolveWithdrawalId(MachineEvent machineEvent) { return null; } - private LocalDateTime parseTime(String timestamp) { - if (timestamp == null) { - return null; - } - - try { - return TypeUtil.stringToLocalDateTime(timestamp); - } catch (Exception e) { - log.warn("Failed to parse withdrawal timestamp '{}'", timestamp, e); - return null; + private void logEmptyContextResult(WithdrawalEventContext context, WithdrawalMapper mapper) { + if (context.getCurrentState() == null) { + log.warn("Skipping {} change without reducer state, withdrawalId={}, eventId={}", + mapper.getChangeType(), + context.getWithdrawalId(), + context.getMachineEvent().getEventId()); + } else { + log.debug("Skipping {} change because mapper produced no update, withdrawalId={}, eventId={}", + mapper.getChangeType(), + context.getWithdrawalId(), + context.getMachineEvent().getEventId()); } } } diff --git a/src/main/java/dev/vality/analytics/listener/mapper/rate/CurrencyEventMapper.java b/src/main/java/dev/vality/analytics/listener/mapper/rate/CurrencyEventMapper.java index f351a540..178e0945 100644 --- a/src/main/java/dev/vality/analytics/listener/mapper/rate/CurrencyEventMapper.java +++ b/src/main/java/dev/vality/analytics/listener/mapper/rate/CurrencyEventMapper.java @@ -1,12 +1,12 @@ package dev.vality.analytics.listener.mapper.rate; import dev.vality.analytics.domain.db.tables.pojos.Rate; +import dev.vality.analytics.utils.TimestampUtil; import dev.vality.exrates.base.Currency; import dev.vality.exrates.base.Rational; import dev.vality.exrates.events.CurrencyEvent; import dev.vality.exrates.events.CurrencyEventPayload; import dev.vality.exrates.events.CurrencyExchangeRate; -import dev.vality.geck.common.util.TypeUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -69,12 +69,11 @@ private CurrencyExchangeRate validateAndGetExchangeRate(CurrencyEvent event, Str } private LocalDateTime parseEventTime(String timestamp, String eventId) { - try { - return TypeUtil.stringToLocalDateTime(timestamp); - } catch (Exception e) { - log.warn("Failed to parse timestamp '{}' for CurrencyEvent, eventId={}", timestamp, eventId, e); - return null; + LocalDateTime eventTime = TimestampUtil.parseLocalDateTime(timestamp); + if (eventTime == null) { + log.warn("Failed to parse timestamp '{}' for CurrencyEvent, eventId={}", timestamp, eventId); } + return eventTime; } private Rate buildRate( diff --git a/src/main/java/dev/vality/analytics/listener/mapper/withdrawal/AbstractWithdrawalMapper.java b/src/main/java/dev/vality/analytics/listener/mapper/withdrawal/AbstractWithdrawalMapper.java index 0d458a59..c62918f3 100644 --- a/src/main/java/dev/vality/analytics/listener/mapper/withdrawal/AbstractWithdrawalMapper.java +++ b/src/main/java/dev/vality/analytics/listener/mapper/withdrawal/AbstractWithdrawalMapper.java @@ -4,8 +4,8 @@ import dev.vality.analytics.listener.handler.withdrawal.WithdrawalEventContext; import dev.vality.analytics.listener.handler.withdrawal.WithdrawalMappingResult; import dev.vality.analytics.listener.mapper.AbstractMapper; +import dev.vality.analytics.utils.TimestampUtil; import dev.vality.fistful.withdrawal.TimestampedChange; -import dev.vality.geck.common.util.TypeUtil; import java.time.LocalDateTime; @@ -14,15 +14,7 @@ public abstract class AbstractWithdrawalMapper implements WithdrawalMapper { protected LocalDateTime parseTime(String timestamp) { - if (timestamp == null) { - return null; - } - - try { - return TypeUtil.stringToLocalDateTime(timestamp); - } catch (Exception e) { - return null; - } + return TimestampUtil.parseLocalDateTime(timestamp); } protected String extractProviderId(dev.vality.fistful.withdrawal.Route route) { diff --git a/src/main/java/dev/vality/analytics/listener/mapper/withdrawal/WithdrawalStatusMapper.java b/src/main/java/dev/vality/analytics/listener/mapper/withdrawal/WithdrawalStatusMapper.java index 4ad15212..8b604d7e 100644 --- a/src/main/java/dev/vality/analytics/listener/mapper/withdrawal/WithdrawalStatusMapper.java +++ b/src/main/java/dev/vality/analytics/listener/mapper/withdrawal/WithdrawalStatusMapper.java @@ -44,10 +44,8 @@ public WithdrawalMappingResult map(TimestampedChange change, WithdrawalEventCont .terminal(updatedState.getTerminal()) .amount(Optional.ofNullable(updatedState.getAmount()) .orElse(Optional.ofNullable(updatedState.getRequestedAmount()).orElse(0L))) - .guaranteeDeposit(0L) .systemFee(Optional.ofNullable(updatedState.getSystemFee()).orElse(0L)) .providerFee(Optional.ofNullable(updatedState.getProviderFee()).orElse(0L)) - .externalFee(Optional.ofNullable(updatedState.getExternalFee()).orElse(0L)) .currency(updatedState.getCurrency()) .status(status) .build(); diff --git a/src/main/java/dev/vality/analytics/listener/mapper/withdrawal/WithdrawalTransferMapper.java b/src/main/java/dev/vality/analytics/listener/mapper/withdrawal/WithdrawalTransferMapper.java index 0d0d3f5e..caf3c9b1 100644 --- a/src/main/java/dev/vality/analytics/listener/mapper/withdrawal/WithdrawalTransferMapper.java +++ b/src/main/java/dev/vality/analytics/listener/mapper/withdrawal/WithdrawalTransferMapper.java @@ -3,7 +3,7 @@ import dev.vality.analytics.computer.WithdrawalCashFlowComputer; import dev.vality.analytics.constant.EventType; import dev.vality.analytics.dao.model.WithdrawalStateSnapshot; -import dev.vality.analytics.domain.CashFlowResult; +import dev.vality.analytics.domain.WithdrawalCashFlowResult; import dev.vality.analytics.listener.handler.withdrawal.WithdrawalEventContext; import dev.vality.analytics.listener.handler.withdrawal.WithdrawalMappingResult; import dev.vality.fistful.cashflow.FinalCashFlow; @@ -33,7 +33,7 @@ public WithdrawalMappingResult map(TimestampedChange change, WithdrawalEventCont dev.vality.fistful.transfer.Change transferChange = change.getChange().getTransfer().getPayload(); FinalCashFlow cashFlow = transferChange.getCreated().getTransfer().getCashflow(); - CashFlowResult cashFlowResult = withdrawalCashFlowComputer.compute( + WithdrawalCashFlowResult cashFlowResult = withdrawalCashFlowComputer.compute( cashFlow != null ? cashFlow.getPostings() : null); return WithdrawalMappingResult.builder() @@ -41,7 +41,6 @@ public WithdrawalMappingResult map(TimestampedChange change, WithdrawalEventCont .amount(cashFlowResult.getAmount()) .systemFee(cashFlowResult.getSystemFee()) .providerFee(cashFlowResult.getProviderFee()) - .externalFee(cashFlowResult.getExternalFee()) .lastSequenceId(context.getMachineEvent().getEventId()) .updatedAt(context.getEventTime()) .build()) diff --git a/src/main/java/dev/vality/analytics/utils/TimestampUtil.java b/src/main/java/dev/vality/analytics/utils/TimestampUtil.java new file mode 100644 index 00000000..2d0e3673 --- /dev/null +++ b/src/main/java/dev/vality/analytics/utils/TimestampUtil.java @@ -0,0 +1,23 @@ +package dev.vality.analytics.utils; + +import dev.vality.geck.common.util.TypeUtil; +import lombok.experimental.UtilityClass; + +import java.time.LocalDateTime; + +@UtilityClass +public class TimestampUtil { + + public static LocalDateTime parseLocalDateTime(String timestamp) { + if (timestamp == null) { + return null; + } + + try { + return TypeUtil.stringToLocalDateTime(timestamp); + } catch (Exception e) { + return null; + } + } + +} diff --git a/src/main/resources/db/migration-clickhouse/non-sharded/V4__create_withdrawal_table.sql b/src/main/resources/db/migration-clickhouse/non-sharded/V4__create_withdrawal_table.sql index 6e2837b0..ba1da497 100644 --- a/src/main/resources/db/migration-clickhouse/non-sharded/V4__create_withdrawal_table.sql +++ b/src/main/resources/db/migration-clickhouse/non-sharded/V4__create_withdrawal_table.sql @@ -11,12 +11,10 @@ CREATE TABLE IF NOT EXISTS analytic.events_sink_withdrawal ( providerId String, terminal String, amount UInt64, - guaranteeDeposit UInt64, systemFee UInt64, providerFee UInt64, - externalFee UInt64, currency String, status Enum8('pending' = 1, 'succeeded' = 2, 'failed' = 3) ) ENGINE = ReplacingMergeTree() PARTITION BY toYYYYMM(timestamp) -ORDER BY (eventTimeHour, partyId, status, currency, providerId, terminal, withdrawalId, sequenceId); +ORDER BY (eventTimeHour, partyId, walletId, status, currency, providerId, terminal, withdrawalId, sequenceId); diff --git a/src/main/resources/db/migration-clickhouse/sharded/V4__create_withdrawal_table.sql b/src/main/resources/db/migration-clickhouse/sharded/V4__create_withdrawal_table.sql index 10e1dad4..c89cf4b7 100644 --- a/src/main/resources/db/migration-clickhouse/sharded/V4__create_withdrawal_table.sql +++ b/src/main/resources/db/migration-clickhouse/sharded/V4__create_withdrawal_table.sql @@ -11,15 +11,13 @@ CREATE TABLE IF NOT EXISTS analytic.events_sink_withdrawal_local ( providerId String, terminal String, amount UInt64, - guaranteeDeposit UInt64, systemFee UInt64, providerFee UInt64, - externalFee UInt64, currency String, status Enum8('pending' = 1, 'succeeded' = 2, 'failed' = 3) ) ENGINE = ReplicatedReplacingMergeTree('/clickhouse/<>/tables/<>/{database}/{table}', '<>') PARTITION BY toYYYYMM(timestamp) -ORDER BY (eventTimeHour, partyId, status, currency, providerId, terminal, withdrawalId, sequenceId); +ORDER BY (eventTimeHour, partyId, walletId, status, currency, providerId, terminal, withdrawalId, sequenceId); CREATE TABLE IF NOT EXISTS analytic.events_sink_withdrawal AS analytic.events_sink_withdrawal_local ENGINE = Distributed('<>', analytic, events_sink_withdrawal_local, cityHash64(timestamp, partyId)); diff --git a/src/main/resources/db/migration/V21__add_withdrawal_state.sql b/src/main/resources/db/migration/V21__add_withdrawal_state.sql index c20a4f6f..43055ee1 100644 --- a/src/main/resources/db/migration/V21__add_withdrawal_state.sql +++ b/src/main/resources/db/migration/V21__add_withdrawal_state.sql @@ -8,7 +8,6 @@ CREATE TABLE analytics.withdrawal_state ( amount BIGINT, system_fee BIGINT, provider_fee BIGINT, - external_fee BIGINT, withdrawal_created_at TIMESTAMP WITHOUT TIME ZONE, provider_id VARCHAR, terminal VARCHAR, diff --git a/src/test/java/dev/vality/analytics/computer/WithdrawalCashFlowComputerTest.java b/src/test/java/dev/vality/analytics/computer/WithdrawalCashFlowComputerTest.java index 23bc55ef..6886dc91 100644 --- a/src/test/java/dev/vality/analytics/computer/WithdrawalCashFlowComputerTest.java +++ b/src/test/java/dev/vality/analytics/computer/WithdrawalCashFlowComputerTest.java @@ -1,6 +1,6 @@ package dev.vality.analytics.computer; -import dev.vality.analytics.domain.CashFlowResult; +import dev.vality.analytics.domain.WithdrawalCashFlowResult; import dev.vality.analytics.utils.WithdrawalEventTestUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -21,28 +21,39 @@ public void setUp() { @Test public void shouldMapAllSupportedPostingTypes() { - CashFlowResult result = withdrawalCashFlowComputer.compute(List.of( - WithdrawalEventTestUtils.merchantToPayout(1000L), - WithdrawalEventTestUtils.merchantToSystem(100L), + WithdrawalCashFlowResult result = withdrawalCashFlowComputer.compute(List.of( + WithdrawalEventTestUtils.walletSenderSettlementToReceiverDestination(1000L), + WithdrawalEventTestUtils.walletSenderSettlementToSystem(100L), WithdrawalEventTestUtils.systemToProvider(20L), WithdrawalEventTestUtils.systemToExternal(10L))); assertThat(result.getAmount(), is(1000L)); assertThat(result.getSystemFee(), is(100L)); assertThat(result.getProviderFee(), is(20L)); - assertThat(result.getExternalFee(), is(10L)); - assertThat(result.getGuaranteeDeposit(), is(0L)); } @Test public void shouldIgnoreUnsupportedPostings() { - CashFlowResult result = withdrawalCashFlowComputer.compute(List.of( + WithdrawalCashFlowResult result = withdrawalCashFlowComputer.compute(List.of( WithdrawalEventTestUtils.unrelatedPosting(999L))); assertThat(result.getAmount(), is(0L)); assertThat(result.getSystemFee(), is(0L)); assertThat(result.getProviderFee(), is(0L)); - assertThat(result.getExternalFee(), is(0L)); - assertThat(result.getGuaranteeDeposit(), is(0L)); + } + + @Test + public void shouldUseLastPostingPerTypeLikeFistfulAssociateBy() { + WithdrawalCashFlowResult result = withdrawalCashFlowComputer.compute(List.of( + WithdrawalEventTestUtils.walletSenderSettlementToReceiverDestination(1000L), + WithdrawalEventTestUtils.walletSenderSettlementToReceiverDestination(900L), + WithdrawalEventTestUtils.walletSenderSettlementToSystem(100L), + WithdrawalEventTestUtils.walletSenderSettlementToSystem(90L), + WithdrawalEventTestUtils.systemToProvider(20L), + WithdrawalEventTestUtils.systemToProvider(15L))); + + assertThat(result.getAmount(), is(900L)); + assertThat(result.getSystemFee(), is(90L)); + assertThat(result.getProviderFee(), is(15L)); } } diff --git a/src/test/java/dev/vality/analytics/listener/WithdrawalListenerTest.java b/src/test/java/dev/vality/analytics/listener/WithdrawalListenerTest.java index 94bfdaa2..e8c7fe34 100644 --- a/src/test/java/dev/vality/analytics/listener/WithdrawalListenerTest.java +++ b/src/test/java/dev/vality/analytics/listener/WithdrawalListenerTest.java @@ -53,12 +53,11 @@ public void shouldReduceWithdrawalStateAndWriteClickHouseSnapshot() { assertEquals(1200L, ((Number) stateRow.get("amount")).longValue()); assertEquals(100L, ((Number) stateRow.get("system_fee")).longValue()); assertEquals(20L, ((Number) stateRow.get("provider_fee")).longValue()); - assertEquals(10L, ((Number) stateRow.get("external_fee")).longValue()); assertEquals("42", stateRow.get("provider_id")); assertEquals("24", stateRow.get("terminal")); Map withdrawalRow = clickHouseJdbcTemplate.queryForMap( - "SELECT partyId, currency, providerId, terminal, amount, systemFee, providerFee, externalFee, status " + + "SELECT partyId, currency, providerId, terminal, amount, systemFee, providerFee, status " + "FROM analytic.events_sink_withdrawal WHERE withdrawalId = ? AND status = 'succeeded'", WithdrawalEventTestUtils.WITHDRAWAL_ID); assertEquals(WithdrawalEventTestUtils.PARTY_ID, withdrawalRow.get("partyId")); @@ -68,7 +67,6 @@ public void shouldReduceWithdrawalStateAndWriteClickHouseSnapshot() { assertEquals(1200L, ((Number) withdrawalRow.get("amount")).longValue()); assertEquals(100L, ((Number) withdrawalRow.get("systemFee")).longValue()); assertEquals(20L, ((Number) withdrawalRow.get("providerFee")).longValue()); - assertEquals(10L, ((Number) withdrawalRow.get("externalFee")).longValue()); clickHouseJdbcTemplate.execute("SYSTEM RELOAD DICTIONARY analytic.shop_dictionary"); String locationUrl = clickHouseJdbcTemplate.queryForObject( diff --git a/src/test/java/dev/vality/analytics/listener/handler/withdrawal/WithdrawalEventHandlerTest.java b/src/test/java/dev/vality/analytics/listener/handler/withdrawal/WithdrawalEventHandlerTest.java index 9e95dc62..cf90eff5 100644 --- a/src/test/java/dev/vality/analytics/listener/handler/withdrawal/WithdrawalEventHandlerTest.java +++ b/src/test/java/dev/vality/analytics/listener/handler/withdrawal/WithdrawalEventHandlerTest.java @@ -119,9 +119,9 @@ public void transferCreatedShouldUpdateMonetaryFields() { .build()); MachineEvent machineEvent = WithdrawalEventTestUtils.machineEvent(3L, - WithdrawalEventTestUtils.transferCreatedChange(1200L, 100L, 20L, 10L)); + WithdrawalEventTestUtils.transferCreatedChange(1200L, 100L, 20L)); when(machineEventParser.parse(machineEvent)) - .thenReturn(WithdrawalEventTestUtils.transferCreatedChange(1200L, 100L, 20L, 10L)); + .thenReturn(WithdrawalEventTestUtils.transferCreatedChange(1200L, 100L, 20L)); withdrawalEventHandler.handle(List.of(machineEvent)); @@ -129,7 +129,6 @@ public void transferCreatedShouldUpdateMonetaryFields() { assertEquals(1200L, snapshot.getAmount()); assertEquals(100L, snapshot.getSystemFee()); assertEquals(20L, snapshot.getProviderFee()); - assertEquals(10L, snapshot.getExternalFee()); } @Test @@ -146,7 +145,6 @@ public void statusChangedShouldWriteSnapshotRowUsingCurrentState() { .amount(1200L) .systemFee(100L) .providerFee(20L) - .externalFee(10L) .lastSequenceId(3L) .build()); @@ -169,7 +167,6 @@ public void statusChangedShouldWriteSnapshotRowUsingCurrentState() { assertThat(row.getAmount(), is(1200L)); assertThat(row.getSystemFee(), is(100L)); assertThat(row.getProviderFee(), is(20L)); - assertThat(row.getExternalFee(), is(10L)); assertThat(row.getStatus().name(), is("succeeded")); } @@ -199,7 +196,6 @@ public void statusChangedShouldFallbackToRequestedAmountWhenTransferMissing() { assertThat(row.getAmount(), is(1500L)); assertThat(row.getSystemFee(), is(0L)); assertThat(row.getProviderFee(), is(0L)); - assertThat(row.getExternalFee(), is(0L)); assertThat(row.getStatus().name(), is("pending")); } } diff --git a/src/test/java/dev/vality/analytics/utils/WithdrawalEventTestUtils.java b/src/test/java/dev/vality/analytics/utils/WithdrawalEventTestUtils.java index 8d72961a..76eaaadb 100644 --- a/src/test/java/dev/vality/analytics/utils/WithdrawalEventTestUtils.java +++ b/src/test/java/dev/vality/analytics/utils/WithdrawalEventTestUtils.java @@ -7,9 +7,9 @@ import dev.vality.fistful.cashflow.FinalCashFlow; import dev.vality.fistful.cashflow.FinalCashFlowAccount; import dev.vality.fistful.cashflow.FinalCashFlowPosting; -import dev.vality.fistful.cashflow.MerchantCashFlowAccount; import dev.vality.fistful.cashflow.ProviderCashFlowAccount; import dev.vality.fistful.cashflow.SystemCashFlowAccount; +import dev.vality.fistful.cashflow.WalletCashFlowAccount; import dev.vality.fistful.transfer.Transfer; import dev.vality.fistful.withdrawal.Change; import dev.vality.fistful.withdrawal.CreatedChange; @@ -81,13 +81,11 @@ public static TimestampedChange routeChange(int providerId, Integer terminalId) public static TimestampedChange transferCreatedChange( long amount, long systemFee, - long providerFee, - long externalFee) { + long providerFee) { FinalCashFlow cashFlow = new FinalCashFlow().setPostings(List.of( - merchantToPayout(amount), - merchantToSystem(systemFee), + walletSenderSettlementToReceiverDestination(amount), + walletSenderSettlementToSystem(systemFee), systemToProvider(providerFee), - systemToExternal(externalFee), unrelatedPosting(999L))); Transfer transfer = new Transfer() @@ -130,20 +128,20 @@ public static SinkEvent sinkEvent(long eventId, TimestampedChange timestampedCha return List.of( sinkEvent(1L, createdChange(1500L, null, null)), sinkEvent(2L, routeChange(42, 24)), - sinkEvent(3L, transferCreatedChange(1200L, 100L, 20L, 10L)), + sinkEvent(3L, transferCreatedChange(1200L, 100L, 20L)), sinkEvent(4L, succeededStatusChange())); } - public static FinalCashFlowPosting merchantToPayout(long amount) { + public static FinalCashFlowPosting walletSenderSettlementToReceiverDestination(long amount) { return posting( - CashFlowAccount.merchant(MerchantCashFlowAccount.settlement), - CashFlowAccount.merchant(MerchantCashFlowAccount.payout), + CashFlowAccount.wallet(WalletCashFlowAccount.sender_settlement), + CashFlowAccount.wallet(WalletCashFlowAccount.receiver_destination), amount); } - public static FinalCashFlowPosting merchantToSystem(long amount) { + public static FinalCashFlowPosting walletSenderSettlementToSystem(long amount) { return posting( - CashFlowAccount.merchant(MerchantCashFlowAccount.settlement), + CashFlowAccount.wallet(WalletCashFlowAccount.sender_settlement), CashFlowAccount.system(SystemCashFlowAccount.settlement), amount); }