diff --git a/.circleci/config.yml b/.circleci/config.yml
index 6db434c..6ef17a4 100644
--- a/.circleci/config.yml
+++ b/.circleci/config.yml
@@ -8,12 +8,15 @@ jobs:
- checkout
- run:
name: Analyze on SonarCloud
- command: mvn verify sonar:sonar
+ command: mvn verify sonar:sonar -DskipTests
executors:
- jdk:
- docker:
- - image: 'cimg/openjdk:21.0.9'
+ vm:
+ machine:
+ image: ubuntu-2204:current
+ environment:
+ architecture: "amd64"
+ platform: "linux/amd64"
orbs:
maven: circleci/maven@2.1.1
@@ -22,6 +25,6 @@ workflows:
maven_test:
jobs:
- maven/test:
- executor: jdk
+ executor: vm
- build:
context: SonarCloud
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 4ddc5e2..363a4c7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -26,26 +26,42 @@
org.springframework.boot
spring-boot-starter-web
-
- org.projectlombok
- lombok
- 1.18.42
-
io.kubemq.sdk
kubemq-sdk-Java
- 1.0.5
+ 2.1.1
io.grpc
grpc-stub
1.79.0
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
org.springframework.boot
spring-boot-starter-test
test
+
+ org.testcontainers
+ testcontainers
+ 1.20.4
+ test
+
+
+ org.testcontainers
+ junit-jupiter
+ 1.20.4
+ test
+
+
+ org.awaitility
+ awaitility
+ test
+
diff --git a/src/main/java/pl/piomin/service/kubemq/config/KubeMQConfiguration.java b/src/main/java/pl/piomin/service/kubemq/config/KubeMQConfiguration.java
index 6f7a958..2953adc 100644
--- a/src/main/java/pl/piomin/service/kubemq/config/KubeMQConfiguration.java
+++ b/src/main/java/pl/piomin/service/kubemq/config/KubeMQConfiguration.java
@@ -1,16 +1,11 @@
package pl.piomin.service.kubemq.config;
-import io.kubemq.sdk.basic.ServerAddressNotSuppliedException;
-import io.kubemq.sdk.event.Channel;
-import io.kubemq.sdk.event.Subscriber;
-import io.kubemq.sdk.queue.Queue;
-
+import io.kubemq.sdk.pubsub.PubSubClient;
+import io.kubemq.sdk.queues.QueuesClient;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
-import javax.net.ssl.SSLException;
-
@Configuration
@ConfigurationProperties("kubemq")
public class KubeMQConfiguration {
@@ -18,18 +13,19 @@ public class KubeMQConfiguration {
private String address;
@Bean
- public Queue queue() throws ServerAddressNotSuppliedException, SSLException {
- return new Queue("transactions", "orders", address);
- }
-
- @Bean
- public Subscriber subscriber() {
- return new Subscriber(address);
+ public QueuesClient queuesClient() {
+ return QueuesClient.builder()
+ .address(address)
+ .clientId("orders-service-queues")
+ .build();
}
@Bean
- public Channel channel() {
- return new Channel("transactions", "orders", true, address);
+ public PubSubClient pubSubClient() {
+ return PubSubClient.builder()
+ .address(address)
+ .clientId("orders-service-pubsub")
+ .build();
}
String getAddress() {
diff --git a/src/main/java/pl/piomin/service/kubemq/controller/OrderController.java b/src/main/java/pl/piomin/service/kubemq/controller/OrderController.java
index 252718b..0c52bc7 100644
--- a/src/main/java/pl/piomin/service/kubemq/controller/OrderController.java
+++ b/src/main/java/pl/piomin/service/kubemq/controller/OrderController.java
@@ -1,10 +1,8 @@
package pl.piomin.service.kubemq.controller;
-import io.kubemq.sdk.basic.ServerAddressNotSuppliedException;
-import io.kubemq.sdk.queue.Message;
-import io.kubemq.sdk.queue.Queue;
-import io.kubemq.sdk.queue.SendMessageResult;
-import io.kubemq.sdk.tools.Converter;
+import io.kubemq.sdk.queues.QueuesClient;
+import io.kubemq.sdk.queues.QueueMessage;
+import io.kubemq.sdk.queues.QueueSendResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.PostMapping;
@@ -14,30 +12,37 @@
import pl.piomin.service.kubemq.model.Order;
import pl.piomin.service.kubemq.model.OrderStatus;
-import java.io.IOException;
+import com.fasterxml.jackson.databind.ObjectMapper;
@RestController
@RequestMapping("/orders")
public class OrderController {
private static final Logger LOGGER = LoggerFactory.getLogger(OrderController.class);
+ private static final ObjectMapper objectMapper = new ObjectMapper();
- private Queue queue;
+ private QueuesClient queuesClient;
- public OrderController(Queue queue) {
- this.queue = queue;
+ public OrderController(QueuesClient queuesClient) {
+ this.queuesClient = queuesClient;
}
@PostMapping
public Order sendOrder(@RequestBody Order order) {
try {
LOGGER.info("Sending: {}", order);
- final SendMessageResult result = queue.SendQueueMessage(new Message()
- .setBody(Converter.ToByteArray(order)));
- order.setId(result.getMessageID());
+ byte[] orderBytes = objectMapper.writeValueAsBytes(order);
+
+ QueueMessage message = QueueMessage.builder()
+ .channel("transactions")
+ .body(orderBytes)
+ .build();
+
+ QueueSendResult result = queuesClient.sendQueuesMessage(message);
+ order.setId(result.getId());
order.setStatus(OrderStatus.ACCEPTED);
LOGGER.info("Sent: {}", order);
- } catch (ServerAddressNotSuppliedException | IOException e) {
+ } catch (Exception e) {
LOGGER.error("Error sending", e);
order.setStatus(OrderStatus.ERROR);
}
diff --git a/src/main/java/pl/piomin/service/kubemq/listener/OrderListener.java b/src/main/java/pl/piomin/service/kubemq/listener/OrderListener.java
index 0c72fbc..f9aae74 100644
--- a/src/main/java/pl/piomin/service/kubemq/listener/OrderListener.java
+++ b/src/main/java/pl/piomin/service/kubemq/listener/OrderListener.java
@@ -1,11 +1,11 @@
package pl.piomin.service.kubemq.listener;
-import io.kubemq.sdk.event.Channel;
-import io.kubemq.sdk.event.Event;
-import io.kubemq.sdk.queue.Queue;
-import io.kubemq.sdk.queue.Transaction;
-import io.kubemq.sdk.queue.TransactionMessagesResponse;
-import io.kubemq.sdk.tools.Converter;
+import io.kubemq.sdk.pubsub.PubSubClient;
+import io.kubemq.sdk.pubsub.EventMessage;
+import io.kubemq.sdk.queues.QueuesClient;
+import io.kubemq.sdk.queues.QueuesPollRequest;
+import io.kubemq.sdk.queues.QueuesPollResponse;
+import io.kubemq.sdk.queues.QueueMessageReceived;
import jakarta.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -15,19 +15,23 @@
import pl.piomin.service.kubemq.model.OrderStatus;
import pl.piomin.service.kubemq.service.OrderProcessor;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
@Component
public class OrderListener {
private static final Logger LOGGER = LoggerFactory.getLogger(OrderListener.class);
+ private static final ObjectMapper objectMapper = new ObjectMapper();
- private Queue queue;
- private Channel channel;
+ private QueuesClient queuesClient;
+ private PubSubClient pubSubClient;
private OrderProcessor orderProcessor;
private TaskExecutor taskExecutor;
- public OrderListener(Queue queue, Channel channel, OrderProcessor orderProcessor, TaskExecutor taskExecutor) {
- this.queue = queue;
- this.channel = channel;
+ public OrderListener(QueuesClient queuesClient, PubSubClient pubSubClient,
+ OrderProcessor orderProcessor, TaskExecutor taskExecutor) {
+ this.queuesClient = queuesClient;
+ this.pubSubClient = pubSubClient;
this.orderProcessor = orderProcessor;
this.taskExecutor = taskExecutor;
}
@@ -37,21 +41,40 @@ public void listen() {
taskExecutor.execute(() -> {
while (true) {
try {
- Transaction transaction = queue.CreateTransaction();
- TransactionMessagesResponse response = transaction.Receive(10, 10);
- if (response.getMessage().getBody().length > 0) {
- Order order = orderProcessor
- .process((Order) Converter.FromByteArray(response.getMessage().getBody()));
+ QueuesPollRequest pollRequest = QueuesPollRequest.builder()
+ .channel("transactions")
+ .pollMaxMessages(1)
+ .pollWaitTimeoutInSeconds(10)
+ .build();
+
+ QueuesPollResponse response = queuesClient.receiveQueuesMessages(pollRequest);
+
+ if (response.isError()) {
+ LOGGER.error("Error receiving message: {}", response.getError());
+ Thread.sleep(10000);
+ continue;
+ }
+
+ if (!response.getMessages().isEmpty()) {
+ QueueMessageReceived message = response.getMessages().get(0);
+ Order order = objectMapper.readValue(message.getBody(), Order.class);
+ order = orderProcessor.process(order);
LOGGER.info("Processed: {}", order);
+
if (order.getStatus().equals(OrderStatus.CONFIRMED)) {
- transaction.AckMessage();
- Event event = new Event();
- event.setEventId(response.getMessage().getMessageID());
- event.setBody(Converter.ToByteArray(order));
- LOGGER.info("Sending event: id={}", event.getEventId());
- channel.SendEvent(event);
+ message.ack();
+
+ byte[] eventBody = objectMapper.writeValueAsBytes(order);
+ EventMessage event = EventMessage.builder()
+ .channel("transactions")
+ .body(eventBody)
+ .id(message.getId())
+ .build();
+
+ LOGGER.info("Sending event: id={}", message.getId());
+ pubSubClient.sendEventsMessage(event);
} else {
- transaction.RejectMessage();
+ message.reject();
}
} else {
LOGGER.info("No messages");
diff --git a/src/main/java/pl/piomin/service/kubemq/listener/TransactionAmountListener.java b/src/main/java/pl/piomin/service/kubemq/listener/TransactionAmountListener.java
index 2645e23..b63fd5f 100644
--- a/src/main/java/pl/piomin/service/kubemq/listener/TransactionAmountListener.java
+++ b/src/main/java/pl/piomin/service/kubemq/listener/TransactionAmountListener.java
@@ -1,13 +1,8 @@
package pl.piomin.service.kubemq.listener;
-import io.grpc.stub.StreamObserver;
-import io.kubemq.sdk.basic.ServerAddressNotSuppliedException;
-import io.kubemq.sdk.event.EventReceive;
-import io.kubemq.sdk.event.Subscriber;
-import io.kubemq.sdk.subscription.EventsStoreType;
-import io.kubemq.sdk.subscription.SubscribeRequest;
-import io.kubemq.sdk.subscription.SubscribeType;
-import io.kubemq.sdk.tools.Converter;
+import io.kubemq.sdk.pubsub.PubSubClient;
+import io.kubemq.sdk.pubsub.EventsStoreSubscription;
+import io.kubemq.sdk.pubsub.EventsStoreType;
import jakarta.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -16,54 +11,48 @@
import pl.piomin.service.kubemq.model.Order;
import pl.piomin.service.kubemq.repository.AccountRepository;
-import javax.net.ssl.SSLException;
-import java.io.IOException;
+import com.fasterxml.jackson.databind.ObjectMapper;
@Component
-public class TransactionAmountListener implements StreamObserver {
+public class TransactionAmountListener {
private static final Logger LOGGER = LoggerFactory.getLogger(TransactionAmountListener.class);
+ private static final ObjectMapper objectMapper = new ObjectMapper();
- private Subscriber subscriber;
+ private PubSubClient pubSubClient;
private AccountRepository accountRepository;
- public TransactionAmountListener(Subscriber subscriber, AccountRepository accountRepository) {
- this.subscriber = subscriber;
+ public TransactionAmountListener(PubSubClient pubSubClient, AccountRepository accountRepository) {
+ this.pubSubClient = pubSubClient;
this.accountRepository = accountRepository;
}
- @Override
- public void onNext(EventReceive eventReceive) {
- try {
- Order order = (Order) Converter.FromByteArray(eventReceive.getBody());
- LOGGER.info("Amount event: {}", order);
- accountRepository.updateBalance(order.getAccountIdTo(), (int) (order.getAmount() * 0.1));
- } catch (IOException | ClassNotFoundException | InsufficientFundsException e) {
- LOGGER.error("Error", e);
- }
- }
-
- @Override
- public void onError(Throwable throwable) {
-
- }
-
- @Override
- public void onCompleted() {
-
- }
-
@PostConstruct
public void init() {
- SubscribeRequest subscribeRequest = new SubscribeRequest();
- subscribeRequest.setChannel("transactions");
- subscribeRequest.setClientID("amount-listener");
- subscribeRequest.setSubscribeType(SubscribeType.EventsStore);
- subscribeRequest.setEventsStoreType(EventsStoreType.StartNewOnly);
try {
- subscriber.SubscribeToEvents(subscribeRequest, this);
- } catch (ServerAddressNotSuppliedException | SSLException e) {
- e.printStackTrace();
+ EventsStoreSubscription subscription = EventsStoreSubscription.builder()
+ .channel("transactions")
+ .group("")
+ .eventsStoreType(EventsStoreType.StartNewOnly)
+ .onReceiveEventCallback(event -> {
+ try {
+ Order order = objectMapper.readValue(event.getBody(), Order.class);
+ LOGGER.info("Amount event: {}", order);
+ accountRepository.updateBalance(order.getAccountIdTo(), (int) (order.getAmount() * 0.1));
+ } catch (InsufficientFundsException e) {
+ LOGGER.error("Error", e);
+ } catch (Exception e) {
+ LOGGER.error("Error processing event", e);
+ }
+ })
+ .onErrorCallback(error -> {
+ LOGGER.error("Subscription error", error);
+ })
+ .build();
+
+ pubSubClient.subscribeToEventsStore(subscription);
+ } catch (Exception e) {
+ LOGGER.error("Error initializing subscription", e);
}
}
}
diff --git a/src/main/java/pl/piomin/service/kubemq/listener/TransactionCountListener.java b/src/main/java/pl/piomin/service/kubemq/listener/TransactionCountListener.java
index 78b46b1..d73acfd 100644
--- a/src/main/java/pl/piomin/service/kubemq/listener/TransactionCountListener.java
+++ b/src/main/java/pl/piomin/service/kubemq/listener/TransactionCountListener.java
@@ -1,13 +1,8 @@
package pl.piomin.service.kubemq.listener;
-import io.grpc.stub.StreamObserver;
-import io.kubemq.sdk.basic.ServerAddressNotSuppliedException;
-import io.kubemq.sdk.event.EventReceive;
-import io.kubemq.sdk.event.Subscriber;
-import io.kubemq.sdk.subscription.EventsStoreType;
-import io.kubemq.sdk.subscription.SubscribeRequest;
-import io.kubemq.sdk.subscription.SubscribeType;
-import io.kubemq.sdk.tools.Converter;
+import io.kubemq.sdk.pubsub.PubSubClient;
+import io.kubemq.sdk.pubsub.EventsStoreSubscription;
+import io.kubemq.sdk.pubsub.EventsStoreType;
import jakarta.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -16,67 +11,61 @@
import pl.piomin.service.kubemq.model.Order;
import pl.piomin.service.kubemq.repository.AccountRepository;
-import javax.net.ssl.SSLException;
-import java.io.IOException;
+import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.HashMap;
import java.util.Map;
@Component
-public class TransactionCountListener implements StreamObserver {
+public class TransactionCountListener {
private static final Logger LOGGER = LoggerFactory.getLogger(TransactionCountListener.class);
+ private static final ObjectMapper objectMapper = new ObjectMapper();
private Map transactionsCount = new HashMap<>();
- private Subscriber subscriber;
+ private PubSubClient pubSubClient;
private AccountRepository accountRepository;
- public TransactionCountListener(Subscriber subscriber, AccountRepository accountRepository) {
- this.subscriber = subscriber;
+ public TransactionCountListener(PubSubClient pubSubClient, AccountRepository accountRepository) {
+ this.pubSubClient = pubSubClient;
this.accountRepository = accountRepository;
}
- @Override
- public void onNext(EventReceive eventReceive) {
- try {
- Order order = (Order) Converter.FromByteArray(eventReceive.getBody());
- LOGGER.info("Count event: {}", order);
- Integer accountIdTo = order.getAccountIdTo();
- Integer noOfTransactions = transactionsCount.get(accountIdTo);
- if (noOfTransactions == null)
- transactionsCount.put(accountIdTo, 1);
- else {
- transactionsCount.put(accountIdTo, ++noOfTransactions);
- if (noOfTransactions > 5) {
- accountRepository.updateBalance(order.getAccountIdTo(), (int) (order.getAmount() * 0.1));
- LOGGER.info("Adding extra to: id={}", order.getAccountIdTo());
- }
- }
- } catch (IOException | ClassNotFoundException | InsufficientFundsException e) {
- LOGGER.error("Error", e);
- }
- }
-
- @Override
- public void onError(Throwable throwable) {
-
- }
-
- @Override
- public void onCompleted() {
-
- }
-
@PostConstruct
public void init() {
- final SubscribeRequest subscribeRequest = new SubscribeRequest();
- subscribeRequest.setChannel("transactions");
- subscribeRequest.setClientID("count-listener-" + System.currentTimeMillis());
- subscribeRequest.setSubscribeType(SubscribeType.EventsStore);
- subscribeRequest.setEventsStoreType(EventsStoreType.StartFromFirst);
try {
- subscriber.SubscribeToEvents(subscribeRequest, this);
- } catch (ServerAddressNotSuppliedException | SSLException e) {
- e.printStackTrace();
+ EventsStoreSubscription subscription = EventsStoreSubscription.builder()
+ .channel("transactions")
+ .group("")
+ .eventsStoreType(EventsStoreType.StartFromFirst)
+ .onReceiveEventCallback(event -> {
+ try {
+ Order order = objectMapper.readValue(event.getBody(), Order.class);
+ LOGGER.info("Count event: {}", order);
+ Integer accountIdTo = order.getAccountIdTo();
+ Integer noOfTransactions = transactionsCount.get(accountIdTo);
+ if (noOfTransactions == null)
+ transactionsCount.put(accountIdTo, 1);
+ else {
+ transactionsCount.put(accountIdTo, ++noOfTransactions);
+ if (noOfTransactions > 5) {
+ accountRepository.updateBalance(order.getAccountIdTo(), (int) (order.getAmount() * 0.1));
+ LOGGER.info("Adding extra to: id={}", order.getAccountIdTo());
+ }
+ }
+ } catch (InsufficientFundsException e) {
+ LOGGER.error("Error", e);
+ } catch (Exception e) {
+ LOGGER.error("Error processing event", e);
+ }
+ })
+ .onErrorCallback(error -> {
+ LOGGER.error("Subscription error", error);
+ })
+ .build();
+
+ pubSubClient.subscribeToEventsStore(subscription);
+ } catch (Exception e) {
+ LOGGER.error("Error initializing subscription", e);
}
}
diff --git a/src/main/java/pl/piomin/service/kubemq/model/Account.java b/src/main/java/pl/piomin/service/kubemq/model/Account.java
index 23ef793..7fe9332 100644
--- a/src/main/java/pl/piomin/service/kubemq/model/Account.java
+++ b/src/main/java/pl/piomin/service/kubemq/model/Account.java
@@ -1,22 +1,65 @@
package pl.piomin.service.kubemq.model;
-import lombok.AllArgsConstructor;
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
-import lombok.NoArgsConstructor;
-import lombok.Setter;
-import lombok.ToString;
-
-@Getter
-@Setter
-@NoArgsConstructor
-@AllArgsConstructor
-@EqualsAndHashCode
-@ToString
+import java.util.Objects;
+
public class Account {
private Integer id;
private String number;
- @EqualsAndHashCode.Exclude private int balance;
+ private int balance;
+
+ public Account() {
+ }
+
+ public Account(Integer id, String number, int balance) {
+ this.id = id;
+ this.number = number;
+ this.balance = balance;
+ }
+
+ public Integer getId() {
+ return id;
+ }
+
+ public void setId(Integer id) {
+ this.id = id;
+ }
+
+ public String getNumber() {
+ return number;
+ }
+
+ public void setNumber(String number) {
+ this.number = number;
+ }
+
+ public int getBalance() {
+ return balance;
+ }
+
+ public void setBalance(int balance) {
+ this.balance = balance;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Account account = (Account) o;
+ return Objects.equals(id, account.id) && Objects.equals(number, account.number);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id, number);
+ }
+ @Override
+ public String toString() {
+ return "Account{" +
+ "id=" + id +
+ ", number='" + number + '\'' +
+ ", balance=" + balance +
+ '}';
+ }
}
diff --git a/src/main/java/pl/piomin/service/kubemq/model/Order.java b/src/main/java/pl/piomin/service/kubemq/model/Order.java
index fb3b961..71a97de 100644
--- a/src/main/java/pl/piomin/service/kubemq/model/Order.java
+++ b/src/main/java/pl/piomin/service/kubemq/model/Order.java
@@ -1,17 +1,8 @@
package pl.piomin.service.kubemq.model;
-import lombok.Builder;
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-
import java.io.Serializable;
import java.time.LocalDateTime;
-@Getter
-@Setter
-@Builder
-@ToString
public class Order implements Serializable {
private OrderType type;
@@ -22,4 +13,85 @@ public class Order implements Serializable {
private String id;
private OrderStatus status;
+ public Order() {
+ }
+
+ public Order(OrderType type, Integer accountIdFrom, Integer accountIdTo, LocalDateTime date, int amount, String id, OrderStatus status) {
+ this.type = type;
+ this.accountIdFrom = accountIdFrom;
+ this.accountIdTo = accountIdTo;
+ this.date = date;
+ this.amount = amount;
+ this.id = id;
+ this.status = status;
+ }
+
+ public OrderType getType() {
+ return type;
+ }
+
+ public void setType(OrderType type) {
+ this.type = type;
+ }
+
+ public Integer getAccountIdFrom() {
+ return accountIdFrom;
+ }
+
+ public void setAccountIdFrom(Integer accountIdFrom) {
+ this.accountIdFrom = accountIdFrom;
+ }
+
+ public Integer getAccountIdTo() {
+ return accountIdTo;
+ }
+
+ public void setAccountIdTo(Integer accountIdTo) {
+ this.accountIdTo = accountIdTo;
+ }
+
+ public LocalDateTime getDate() {
+ return date;
+ }
+
+ public void setDate(LocalDateTime date) {
+ this.date = date;
+ }
+
+ public int getAmount() {
+ return amount;
+ }
+
+ public void setAmount(int amount) {
+ this.amount = amount;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public OrderStatus getStatus() {
+ return status;
+ }
+
+ public void setStatus(OrderStatus status) {
+ this.status = status;
+ }
+
+ @Override
+ public String toString() {
+ return "Order{" +
+ "type=" + type +
+ ", accountIdFrom=" + accountIdFrom +
+ ", accountIdTo=" + accountIdTo +
+ ", date=" + date +
+ ", amount=" + amount +
+ ", id='" + id + '\'' +
+ ", status=" + status +
+ '}';
+ }
}
diff --git a/src/test/java/pl/piomin/service/kubemq/KubeMQIntegrationTest.java b/src/test/java/pl/piomin/service/kubemq/KubeMQIntegrationTest.java
new file mode 100644
index 0000000..c0b216e
--- /dev/null
+++ b/src/test/java/pl/piomin/service/kubemq/KubeMQIntegrationTest.java
@@ -0,0 +1,290 @@
+package pl.piomin.service.kubemq;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import io.kubemq.sdk.pubsub.EventMessage;
+import io.kubemq.sdk.pubsub.EventSendResult;
+import io.kubemq.sdk.pubsub.PubSubClient;
+import io.kubemq.sdk.queues.*;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import pl.piomin.service.kubemq.model.Order;
+import pl.piomin.service.kubemq.model.OrderStatus;
+import pl.piomin.service.kubemq.model.OrderType;
+
+import java.time.LocalDateTime;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+@Testcontainers
+class KubeMQIntegrationTest {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(KubeMQIntegrationTest.class);
+ private static final ObjectMapper objectMapper = new ObjectMapper()
+ .registerModule(new JavaTimeModule())
+ .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+
+ @Container
+ static GenericContainer> kubemqContainer = new GenericContainer<>("kubemq/kubemq-community:latest")
+ .withExposedPorts(50000, 8080, 9090)
+ .withEnv("KUBEMQ_TOKEN", "")
+ .withEnv("KUBEMQ_LICENSE", "")
+ .waitingFor(Wait.forListeningPort())
+ .withStartupTimeout(java.time.Duration.ofMinutes(2));
+
+ private QueuesClient queuesClient;
+ private PubSubClient pubSubClient;
+ private String kubemqAddress;
+
+ @BeforeEach
+ void setUp() {
+ kubemqAddress = kubemqContainer.getHost() + ":" + kubemqContainer.getMappedPort(50000);
+ LOGGER.info("KubeMQ container started at: {}", kubemqAddress);
+
+ queuesClient = QueuesClient.builder()
+ .address(kubemqAddress)
+ .clientId("test-queues-client")
+ .build();
+
+ pubSubClient = PubSubClient.builder()
+ .address(kubemqAddress)
+ .clientId("test-pubsub-client")
+ .build();
+ }
+
+ @AfterEach
+ void tearDown() {
+ if (queuesClient != null) {
+ try {
+ queuesClient.close();
+ } catch (Exception e) {
+ LOGGER.warn("Error closing queues client", e);
+ }
+ }
+ if (pubSubClient != null) {
+ try {
+ pubSubClient.close();
+ } catch (Exception e) {
+ LOGGER.warn("Error closing pubsub client", e);
+ }
+ }
+ }
+
+ @Test
+ void shouldSendAndReceiveQueueMessage() throws Exception {
+ // Given
+ String testChannel = "test-transactions";
+ Order testOrder = new Order(
+ OrderType.TRANSFER,
+ 1,
+ 2,
+ LocalDateTime.now(),
+ 100,
+ null,
+ OrderStatus.ACCEPTED
+ );
+
+ byte[] orderBytes = objectMapper.writeValueAsBytes(testOrder);
+
+ // When - Send message
+ QueueMessage message = QueueMessage.builder()
+ .channel(testChannel)
+ .body(orderBytes)
+ .metadata("test-message")
+ .build();
+
+ QueueSendResult sendResult = queuesClient.sendQueuesMessage(message);
+
+ // Then - Verify send
+ assertNotNull(sendResult);
+ assertNotNull(sendResult.getId());
+ assertFalse(sendResult.isError());
+ LOGGER.info("Message sent with ID: {}", sendResult.getId());
+
+ // When - Receive message
+ QueuesPollRequest pollRequest = QueuesPollRequest.builder()
+ .channel(testChannel)
+ .pollMaxMessages(1)
+ .pollWaitTimeoutInSeconds(10)
+ .build();
+
+ QueuesPollResponse pollResponse = queuesClient.receiveQueuesMessages(pollRequest);
+
+ // Then - Verify receive
+ assertNotNull(pollResponse);
+ assertFalse(pollResponse.isError());
+ assertFalse(pollResponse.getMessages().isEmpty());
+
+ QueueMessageReceived receivedMessage = pollResponse.getMessages().get(0);
+ assertNotNull(receivedMessage);
+ assertNotNull(receivedMessage.getBody());
+
+ Order receivedOrder = objectMapper.readValue(receivedMessage.getBody(), Order.class);
+ assertEquals(testOrder.getAccountIdFrom(), receivedOrder.getAccountIdFrom());
+ assertEquals(testOrder.getAccountIdTo(), receivedOrder.getAccountIdTo());
+ assertEquals(testOrder.getAmount(), receivedOrder.getAmount());
+ assertEquals(testOrder.getType(), receivedOrder.getType());
+
+ LOGGER.info("Message received: {}", receivedOrder);
+
+ // Acknowledge the message
+ receivedMessage.ack();
+ }
+
+ @Test
+ void shouldSendAndReceiveMultipleQueueMessages() throws Exception {
+ // Given
+ String testChannel = "test-bulk-transactions";
+ int messageCount = 5;
+
+ // When - Send multiple messages
+ for (int i = 0; i < messageCount; i++) {
+ Order order = new Order(
+ OrderType.TRANSFER,
+ 1,
+ 2,
+ LocalDateTime.now(),
+ 100 + i,
+ null,
+ OrderStatus.ACCEPTED
+ );
+
+ byte[] orderBytes = objectMapper.writeValueAsBytes(order);
+ QueueMessage message = QueueMessage.builder()
+ .channel(testChannel)
+ .body(orderBytes)
+ .build();
+
+ QueueSendResult result = queuesClient.sendQueuesMessage(message);
+ assertFalse(result.isError());
+ }
+
+ LOGGER.info("Sent {} messages", messageCount);
+
+ // When - Receive messages
+ QueuesPollRequest pollRequest = QueuesPollRequest.builder()
+ .channel(testChannel)
+ .pollMaxMessages(messageCount)
+ .pollWaitTimeoutInSeconds(10)
+ .build();
+
+ QueuesPollResponse pollResponse = queuesClient.receiveQueuesMessages(pollRequest);
+
+ // Then
+ assertNotNull(pollResponse);
+ assertFalse(pollResponse.isError());
+ assertEquals(messageCount, pollResponse.getMessages().size());
+
+ LOGGER.info("Received {} messages", pollResponse.getMessages().size());
+
+ // Acknowledge all messages
+ for (QueueMessageReceived msg : pollResponse.getMessages()) {
+ msg.ack();
+ }
+ }
+
+ @Test
+ void shouldSendEventMessage() throws Exception {
+ // Given
+ String testChannel = "test-events";
+ Order testOrder = new Order(
+ OrderType.TRANSFER,
+ 1,
+ 2,
+ LocalDateTime.now(),
+ 200,
+ "test-event-id",
+ OrderStatus.CONFIRMED
+ );
+
+ byte[] orderBytes = objectMapper.writeValueAsBytes(testOrder);
+
+ // When
+ EventMessage eventMessage = EventMessage.builder()
+ .channel(testChannel)
+ .body(orderBytes)
+ .metadata("test-event")
+ .build();
+
+ // Send event - method returns void in SDK v2
+ pubSubClient.sendEventsMessage(eventMessage);
+
+ // Then - If no exception thrown, the event was sent successfully
+ LOGGER.info("Event sent successfully to channel: {}", testChannel);
+ }
+
+ @Test
+ void shouldRejectQueueMessage() throws Exception {
+ // Given
+ String testChannel = "test-reject";
+ Order testOrder = new Order(
+ OrderType.TRANSFER,
+ 1,
+ 2,
+ LocalDateTime.now(),
+ 300,
+ null,
+ OrderStatus.ACCEPTED
+ );
+
+ byte[] orderBytes = objectMapper.writeValueAsBytes(testOrder);
+
+ // When - Send message
+ QueueMessage message = QueueMessage.builder()
+ .channel(testChannel)
+ .body(orderBytes)
+ .build();
+
+ queuesClient.sendQueuesMessage(message);
+
+ // Receive message
+ QueuesPollRequest pollRequest = QueuesPollRequest.builder()
+ .channel(testChannel)
+ .pollMaxMessages(1)
+ .pollWaitTimeoutInSeconds(10)
+ .build();
+
+ QueuesPollResponse pollResponse = queuesClient.receiveQueuesMessages(pollRequest);
+ assertFalse(pollResponse.getMessages().isEmpty());
+
+ QueueMessageReceived receivedMessage = pollResponse.getMessages().get(0);
+
+ // Reject the message
+ receivedMessage.reject();
+ LOGGER.info("Message rejected successfully");
+
+ // The message should be available again after rejection
+ QueuesPollResponse secondPollResponse = queuesClient.receiveQueuesMessages(pollRequest);
+ assertFalse(secondPollResponse.isError());
+ // Note: Depending on KubeMQ configuration, the rejected message might be re-queued
+ }
+
+ @Test
+ void shouldHandleEmptyQueuePoll() throws Exception {
+ // Given
+ String emptyChannel = "empty-channel-" + System.currentTimeMillis();
+
+ // When
+ QueuesPollRequest pollRequest = QueuesPollRequest.builder()
+ .channel(emptyChannel)
+ .pollMaxMessages(1)
+ .pollWaitTimeoutInSeconds(2)
+ .build();
+
+ QueuesPollResponse pollResponse = queuesClient.receiveQueuesMessages(pollRequest);
+
+ // Then
+ assertNotNull(pollResponse);
+ assertFalse(pollResponse.isError());
+ assertTrue(pollResponse.getMessages().isEmpty());
+ LOGGER.info("Empty queue poll handled correctly");
+ }
+}