Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@
<artifactId>machinegun-proto</artifactId>
<version>1.43-3decc8f</version>
</dependency>
<dependency>
<groupId>dev.vality</groupId>
<artifactId>fistful-proto</artifactId>
<version>1.188-f7ce08e</version>
</dependency>
<dependency>
<groupId>dev.vality.geck</groupId>
<artifactId>filter</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package dev.vality.analytics.computer;

import dev.vality.analytics.domain.WithdrawalCashFlowResult;
import dev.vality.fistful.cashflow.CashFlowAccount;
import dev.vality.fistful.cashflow.FinalCashFlowPosting;
import dev.vality.fistful.cashflow.MerchantCashFlowAccount;
import dev.vality.fistful.cashflow.ProviderCashFlowAccount;
import dev.vality.fistful.cashflow.SystemCashFlowAccount;
import dev.vality.fistful.cashflow.WalletCashFlowAccount;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;

import java.util.EnumMap;
import java.util.List;

@Service
public class WithdrawalCashFlowComputer {

public WithdrawalCashFlowResult compute(List<FinalCashFlowPosting> cashFlow) {
EnumMap<WithdrawalCashFlowType, Long> cashFlowMap = new EnumMap<>(WithdrawalCashFlowType.class);

if (CollectionUtils.isEmpty(cashFlow)) {
return WithdrawalCashFlowResult.EMPTY;
}

for (FinalCashFlowPosting posting : cashFlow) {
if (posting == null || !posting.isSetSource() || !posting.isSetDestination() || !posting.isSetVolume()
|| !posting.getSource().isSetAccountType() || !posting.getDestination().isSetAccountType()) {
continue;
}

WithdrawalCashFlowType type = getCashFlowType(
posting.getSource().getAccountType(),
posting.getDestination().getAccountType());
cashFlowMap.put(type, posting.getVolume().getAmount());
}

return WithdrawalCashFlowResult.builder()
.amount(cashFlowMap.getOrDefault(WithdrawalCashFlowType.AMOUNT, 0L))
.systemFee(cashFlowMap.getOrDefault(WithdrawalCashFlowType.FEE, 0L))
.providerFee(cashFlowMap.getOrDefault(WithdrawalCashFlowType.PROVIDER_FEE, 0L))
.build();
}

private WithdrawalCashFlowType getCashFlowType(CashFlowAccount source, CashFlowAccount destination) {
if (isWalletSenderSettlement(source) && isWalletReceiverDestination(destination)) {
return WithdrawalCashFlowType.AMOUNT;
}

if (isWalletSenderSettlement(source) && isSystemSettlement(destination)) {
return WithdrawalCashFlowType.FEE;
}

if (isSystemSettlement(source) && isProviderSettlement(destination)) {
return WithdrawalCashFlowType.PROVIDER_FEE;
}

if (isMerchantSettlement(source) && isProviderSettlement(destination)) {
return WithdrawalCashFlowType.REFUND_AMOUNT;
}

return WithdrawalCashFlowType.UNKNOWN;
}

private boolean isWalletSenderSettlement(CashFlowAccount account) {
return account.isSetWallet() && account.getWallet() == WalletCashFlowAccount.sender_settlement;
}

private boolean isWalletReceiverDestination(CashFlowAccount account) {
return account.isSetWallet() && account.getWallet() == WalletCashFlowAccount.receiver_destination;
}

private boolean isSystemSettlement(CashFlowAccount account) {
return account.isSetSystem() && account.getSystem() == SystemCashFlowAccount.settlement;
}

private boolean isProviderSettlement(CashFlowAccount account) {
return account.isSetProvider() && account.getProvider() == ProviderCashFlowAccount.settlement;
}

private boolean isMerchantSettlement(CashFlowAccount account) {
return account.isSetMerchant() && account.getMerchant() == MerchantCashFlowAccount.settlement;
}

private enum WithdrawalCashFlowType {
AMOUNT,
FEE,
PROVIDER_FEE,
REFUND_AMOUNT,
UNKNOWN
}
}
13 changes: 13 additions & 0 deletions src/main/java/dev/vality/analytics/config/KafkaConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class KafkaConfig {
private static final String RESULT_ANALYTICS = "result-analytics";
private static final String PARTY_RESULT_ANALYTICS = "party-result-analytics";
private static final String DOMINANT_ANALYTICS = "dominant-analytics";
private static final String WITHDRAWAL_ANALYTICS = "withdrawal-analytics";
private final ConsumerGroupIdService consumerGroupIdService;
private final KafkaProperties kafkaProperties;

Expand All @@ -53,6 +54,8 @@ public class KafkaConfig {
private String maxPollRecordsDominantListener;
@Value("${kafka.topic.rate.max.poll.records}")
private String maxPollRecordsRatesListener;
@Value("${kafka.topic.withdrawal.max.poll.records}")
private String maxPollRecordsWithdrawalListener;
@Value("${kafka.consumer.concurrency}")
private int concurrencyListeners;
@Value("${kafka.topic.rate.groupId}")
Expand Down Expand Up @@ -88,6 +91,16 @@ public ConcurrentKafkaListenerContainerFactory<String, HistoricalCommit> dominan
return factory;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, MachineEvent> withdrawalListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, MachineEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
String consumerGroup = consumerGroupIdService.generateGroupId(WITHDRAWAL_ANALYTICS);
initDefaultListenerProperties(factory, consumerGroup,
new MachineEventDeserializer(), maxPollRecordsWithdrawalListener);
return factory;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, CurrencyEvent> rateContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, CurrencyEvent> factory =
Expand Down
17 changes: 17 additions & 0 deletions src/main/java/dev/vality/analytics/config/SerializeConfig.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package dev.vality.analytics.config;

import dev.vality.fistful.withdrawal.TimestampedChange;
import dev.vality.damsel.domain_config_v2.HistoricalCommit;
import dev.vality.exrates.events.CurrencyEvent;
import dev.vality.geck.serializer.Geck;
Expand Down Expand Up @@ -33,4 +34,20 @@ public HistoricalCommit deserialize(byte[] bytes) {
});
}

@Bean
public BinaryDeserializer<TimestampedChange> withdrawalTimestampedChangeBinaryDeserializer() {
return new AbstractThriftBinaryDeserializer<>() {
@Override
public TimestampedChange deserialize(byte[] bytes) {
return Geck.msgPackToTBase(bytes, TimestampedChange.class);
}
};
}

@Bean
public MachineEventParser<TimestampedChange> withdrawalTimestampedChangeMachineEventParser(
BinaryDeserializer<TimestampedChange> withdrawalTimestampedChangeBinaryDeserializer) {
return new MachineEventParser<>(withdrawalTimestampedChangeBinaryDeserializer);
}

}
5 changes: 4 additions & 1 deletion src/main/java/dev/vality/analytics/constant/EventType.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ public enum EventType {
".payload.session_transaction_bound", new IsNullCondition().not()),
INVOICE_PAYMENT_RISK_SCORE_CHANGED("invoice_payment_change.payload.invoice_payment_risk_score_changed",
new IsNullCondition().not()),
RATE_CREATED("created", new IsNullCondition().not());
WITHDRAWAL_CREATED("change.created", new IsNullCondition().not()),
WITHDRAWAL_ROUTE_CHANGED("change.route", new IsNullCondition().not()),
WITHDRAWAL_TRANSFER_CHANGED("change.transfer.payload.created", new IsNullCondition().not()),
WITHDRAWAL_STATUS_CHANGED("change.status_changed", new IsNullCondition().not());

Filter filter;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package dev.vality.analytics.constant;

public enum WithdrawalStatus {
pending,
succeeded,
failed
}
32 changes: 32 additions & 0 deletions src/main/java/dev/vality/analytics/dao/model/WithdrawalRow.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package dev.vality.analytics.dao.model;

import dev.vality.analytics.constant.WithdrawalStatus;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.time.LocalDateTime;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class WithdrawalRow {

private LocalDateTime eventTime;
private String partyId;
private String withdrawalId;
private long sequenceId;
private LocalDateTime withdrawalTime;
private String walletId;
private String destinationId;
private String providerId;
private String terminal;
private long amount;
private long systemFee;
private long providerFee;
private String currency;
private WithdrawalStatus status;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package dev.vality.analytics.dao.model;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.time.LocalDateTime;

@Data
@Builder(toBuilder = true)
@NoArgsConstructor
@AllArgsConstructor
public class WithdrawalStateSnapshot {

private String withdrawalId;
private String partyId;
private String walletId;
private String destinationId;
private String currency;
private Long requestedAmount;
private Long amount;
private Long systemFee;
private Long providerFee;
private LocalDateTime withdrawalCreatedAt;
private String providerId;
private String terminal;
private long lastSequenceId;
private LocalDateTime updatedAt;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package dev.vality.analytics.dao.repository.clickhouse;

import dev.vality.analytics.constant.ClickHouseUtilsValue;
import dev.vality.analytics.dao.model.WithdrawalRow;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;

import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.List;

public class ClickHouseWithdrawalBatchPreparedStatementSetter implements BatchPreparedStatementSetter {

public static final String INSERT = "INSERT INTO analytic.events_sink_withdrawal " +
"(timestamp, eventTime, eventTimeHour, partyId, withdrawalId, sequenceId, withdrawalTime, " +
"walletId, " +
"destinationId, providerId, terminal, amount, systemFee, providerFee, " +
"currency, status) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";

private final List<WithdrawalRow> batch;

public ClickHouseWithdrawalBatchPreparedStatementSetter(List<WithdrawalRow> batch) {
this.batch = batch;
}

@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
WithdrawalRow row = batch.get(i);
int column = 1;
ps.setObject(column++, row.getEventTime().toLocalDate());
ps.setLong(column++, row.getEventTime().toEpochSecond(ZoneOffset.UTC));
ps.setLong(
column++,
row.getEventTime().toInstant(ZoneOffset.UTC).truncatedTo(ChronoUnit.HOURS).toEpochMilli());
ps.setString(column++, defaultString(row.getPartyId()));
ps.setString(column++, defaultString(row.getWithdrawalId()));
ps.setLong(column++, row.getSequenceId());
ps.setLong(column++, row.getWithdrawalTime().toEpochSecond(ZoneOffset.UTC));
ps.setString(column++, defaultString(row.getWalletId()));
ps.setString(column++, defaultString(row.getDestinationId()));
ps.setString(column++, defaultString(row.getProviderId()));
ps.setString(column++, defaultString(row.getTerminal()));
ps.setLong(column++, safeUnsigned(row.getAmount()));
ps.setLong(column++, safeUnsigned(row.getSystemFee()));
ps.setLong(column++, safeUnsigned(row.getProviderFee()));
ps.setString(column++, defaultString(row.getCurrency()));
ps.setString(column, row.getStatus().name());
}

@Override
public int getBatchSize() {
return batch.size();
}

private String defaultString(String value) {
return value != null ? value : ClickHouseUtilsValue.UNKNOWN;
}

private long safeUnsigned(long value) {
return Math.max(0L, value);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package dev.vality.analytics.dao.repository.clickhouse;

import dev.vality.analytics.dao.model.WithdrawalRow;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;

import java.sql.SQLException;
import java.util.List;

@Slf4j
@Service
@RequiredArgsConstructor
public class ClickHouseWithdrawalRepository {

private final JdbcTemplate clickHouseJdbcTemplate;

@Retryable(retryFor = SQLException.class, backoff = @Backoff(delay = 5000))
public void insertBatch(List<WithdrawalRow> withdrawalRows) {
if (CollectionUtils.isEmpty(withdrawalRows)) {
return;
}

log.info("Batch start insert withdrawalRows: {} firstElement: {}",
withdrawalRows.size(), withdrawalRows.get(0).getWithdrawalId());
clickHouseJdbcTemplate.batchUpdate(
ClickHouseWithdrawalBatchPreparedStatementSetter.INSERT,
new ClickHouseWithdrawalBatchPreparedStatementSetter(withdrawalRows));
log.info("Batch inserted withdrawalRows: {} firstElement: {}",
withdrawalRows.size(), withdrawalRows.get(0).getWithdrawalId());
}
}
Loading
Loading