diff --git a/.circleci/config.yml b/.circleci/config.yml index 6db434c..6ef17a4 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -8,12 +8,15 @@ jobs: - checkout - run: name: Analyze on SonarCloud - command: mvn verify sonar:sonar + command: mvn verify sonar:sonar -DskipTests executors: - jdk: - docker: - - image: 'cimg/openjdk:21.0.9' + vm: + machine: + image: ubuntu-2204:current + environment: + architecture: "amd64" + platform: "linux/amd64" orbs: maven: circleci/maven@2.1.1 @@ -22,6 +25,6 @@ workflows: maven_test: jobs: - maven/test: - executor: jdk + executor: vm - build: context: SonarCloud \ No newline at end of file diff --git a/pom.xml b/pom.xml index 4ddc5e2..363a4c7 100644 --- a/pom.xml +++ b/pom.xml @@ -26,26 +26,42 @@ org.springframework.boot spring-boot-starter-web - - org.projectlombok - lombok - 1.18.42 - io.kubemq.sdk kubemq-sdk-Java - 1.0.5 + 2.1.1 io.grpc grpc-stub 1.79.0 + + com.fasterxml.jackson.core + jackson-databind + org.springframework.boot spring-boot-starter-test test + + org.testcontainers + testcontainers + 1.20.4 + test + + + org.testcontainers + junit-jupiter + 1.20.4 + test + + + org.awaitility + awaitility + test + diff --git a/src/main/java/pl/piomin/service/kubemq/config/KubeMQConfiguration.java b/src/main/java/pl/piomin/service/kubemq/config/KubeMQConfiguration.java index 6f7a958..2953adc 100644 --- a/src/main/java/pl/piomin/service/kubemq/config/KubeMQConfiguration.java +++ b/src/main/java/pl/piomin/service/kubemq/config/KubeMQConfiguration.java @@ -1,16 +1,11 @@ package pl.piomin.service.kubemq.config; -import io.kubemq.sdk.basic.ServerAddressNotSuppliedException; -import io.kubemq.sdk.event.Channel; -import io.kubemq.sdk.event.Subscriber; -import io.kubemq.sdk.queue.Queue; - +import io.kubemq.sdk.pubsub.PubSubClient; +import io.kubemq.sdk.queues.QueuesClient; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import javax.net.ssl.SSLException; - @Configuration @ConfigurationProperties("kubemq") public class KubeMQConfiguration { @@ -18,18 +13,19 @@ public class KubeMQConfiguration { private String address; @Bean - public Queue queue() throws ServerAddressNotSuppliedException, SSLException { - return new Queue("transactions", "orders", address); - } - - @Bean - public Subscriber subscriber() { - return new Subscriber(address); + public QueuesClient queuesClient() { + return QueuesClient.builder() + .address(address) + .clientId("orders-service-queues") + .build(); } @Bean - public Channel channel() { - return new Channel("transactions", "orders", true, address); + public PubSubClient pubSubClient() { + return PubSubClient.builder() + .address(address) + .clientId("orders-service-pubsub") + .build(); } String getAddress() { diff --git a/src/main/java/pl/piomin/service/kubemq/controller/OrderController.java b/src/main/java/pl/piomin/service/kubemq/controller/OrderController.java index 252718b..0c52bc7 100644 --- a/src/main/java/pl/piomin/service/kubemq/controller/OrderController.java +++ b/src/main/java/pl/piomin/service/kubemq/controller/OrderController.java @@ -1,10 +1,8 @@ package pl.piomin.service.kubemq.controller; -import io.kubemq.sdk.basic.ServerAddressNotSuppliedException; -import io.kubemq.sdk.queue.Message; -import io.kubemq.sdk.queue.Queue; -import io.kubemq.sdk.queue.SendMessageResult; -import io.kubemq.sdk.tools.Converter; +import io.kubemq.sdk.queues.QueuesClient; +import io.kubemq.sdk.queues.QueueMessage; +import io.kubemq.sdk.queues.QueueSendResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.bind.annotation.PostMapping; @@ -14,30 +12,37 @@ import pl.piomin.service.kubemq.model.Order; import pl.piomin.service.kubemq.model.OrderStatus; -import java.io.IOException; +import com.fasterxml.jackson.databind.ObjectMapper; @RestController @RequestMapping("/orders") public class OrderController { private static final Logger LOGGER = LoggerFactory.getLogger(OrderController.class); + private static final ObjectMapper objectMapper = new ObjectMapper(); - private Queue queue; + private QueuesClient queuesClient; - public OrderController(Queue queue) { - this.queue = queue; + public OrderController(QueuesClient queuesClient) { + this.queuesClient = queuesClient; } @PostMapping public Order sendOrder(@RequestBody Order order) { try { LOGGER.info("Sending: {}", order); - final SendMessageResult result = queue.SendQueueMessage(new Message() - .setBody(Converter.ToByteArray(order))); - order.setId(result.getMessageID()); + byte[] orderBytes = objectMapper.writeValueAsBytes(order); + + QueueMessage message = QueueMessage.builder() + .channel("transactions") + .body(orderBytes) + .build(); + + QueueSendResult result = queuesClient.sendQueuesMessage(message); + order.setId(result.getId()); order.setStatus(OrderStatus.ACCEPTED); LOGGER.info("Sent: {}", order); - } catch (ServerAddressNotSuppliedException | IOException e) { + } catch (Exception e) { LOGGER.error("Error sending", e); order.setStatus(OrderStatus.ERROR); } diff --git a/src/main/java/pl/piomin/service/kubemq/listener/OrderListener.java b/src/main/java/pl/piomin/service/kubemq/listener/OrderListener.java index 0c72fbc..f9aae74 100644 --- a/src/main/java/pl/piomin/service/kubemq/listener/OrderListener.java +++ b/src/main/java/pl/piomin/service/kubemq/listener/OrderListener.java @@ -1,11 +1,11 @@ package pl.piomin.service.kubemq.listener; -import io.kubemq.sdk.event.Channel; -import io.kubemq.sdk.event.Event; -import io.kubemq.sdk.queue.Queue; -import io.kubemq.sdk.queue.Transaction; -import io.kubemq.sdk.queue.TransactionMessagesResponse; -import io.kubemq.sdk.tools.Converter; +import io.kubemq.sdk.pubsub.PubSubClient; +import io.kubemq.sdk.pubsub.EventMessage; +import io.kubemq.sdk.queues.QueuesClient; +import io.kubemq.sdk.queues.QueuesPollRequest; +import io.kubemq.sdk.queues.QueuesPollResponse; +import io.kubemq.sdk.queues.QueueMessageReceived; import jakarta.annotation.PostConstruct; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -15,19 +15,23 @@ import pl.piomin.service.kubemq.model.OrderStatus; import pl.piomin.service.kubemq.service.OrderProcessor; +import com.fasterxml.jackson.databind.ObjectMapper; + @Component public class OrderListener { private static final Logger LOGGER = LoggerFactory.getLogger(OrderListener.class); + private static final ObjectMapper objectMapper = new ObjectMapper(); - private Queue queue; - private Channel channel; + private QueuesClient queuesClient; + private PubSubClient pubSubClient; private OrderProcessor orderProcessor; private TaskExecutor taskExecutor; - public OrderListener(Queue queue, Channel channel, OrderProcessor orderProcessor, TaskExecutor taskExecutor) { - this.queue = queue; - this.channel = channel; + public OrderListener(QueuesClient queuesClient, PubSubClient pubSubClient, + OrderProcessor orderProcessor, TaskExecutor taskExecutor) { + this.queuesClient = queuesClient; + this.pubSubClient = pubSubClient; this.orderProcessor = orderProcessor; this.taskExecutor = taskExecutor; } @@ -37,21 +41,40 @@ public void listen() { taskExecutor.execute(() -> { while (true) { try { - Transaction transaction = queue.CreateTransaction(); - TransactionMessagesResponse response = transaction.Receive(10, 10); - if (response.getMessage().getBody().length > 0) { - Order order = orderProcessor - .process((Order) Converter.FromByteArray(response.getMessage().getBody())); + QueuesPollRequest pollRequest = QueuesPollRequest.builder() + .channel("transactions") + .pollMaxMessages(1) + .pollWaitTimeoutInSeconds(10) + .build(); + + QueuesPollResponse response = queuesClient.receiveQueuesMessages(pollRequest); + + if (response.isError()) { + LOGGER.error("Error receiving message: {}", response.getError()); + Thread.sleep(10000); + continue; + } + + if (!response.getMessages().isEmpty()) { + QueueMessageReceived message = response.getMessages().get(0); + Order order = objectMapper.readValue(message.getBody(), Order.class); + order = orderProcessor.process(order); LOGGER.info("Processed: {}", order); + if (order.getStatus().equals(OrderStatus.CONFIRMED)) { - transaction.AckMessage(); - Event event = new Event(); - event.setEventId(response.getMessage().getMessageID()); - event.setBody(Converter.ToByteArray(order)); - LOGGER.info("Sending event: id={}", event.getEventId()); - channel.SendEvent(event); + message.ack(); + + byte[] eventBody = objectMapper.writeValueAsBytes(order); + EventMessage event = EventMessage.builder() + .channel("transactions") + .body(eventBody) + .id(message.getId()) + .build(); + + LOGGER.info("Sending event: id={}", message.getId()); + pubSubClient.sendEventsMessage(event); } else { - transaction.RejectMessage(); + message.reject(); } } else { LOGGER.info("No messages"); diff --git a/src/main/java/pl/piomin/service/kubemq/listener/TransactionAmountListener.java b/src/main/java/pl/piomin/service/kubemq/listener/TransactionAmountListener.java index 2645e23..b63fd5f 100644 --- a/src/main/java/pl/piomin/service/kubemq/listener/TransactionAmountListener.java +++ b/src/main/java/pl/piomin/service/kubemq/listener/TransactionAmountListener.java @@ -1,13 +1,8 @@ package pl.piomin.service.kubemq.listener; -import io.grpc.stub.StreamObserver; -import io.kubemq.sdk.basic.ServerAddressNotSuppliedException; -import io.kubemq.sdk.event.EventReceive; -import io.kubemq.sdk.event.Subscriber; -import io.kubemq.sdk.subscription.EventsStoreType; -import io.kubemq.sdk.subscription.SubscribeRequest; -import io.kubemq.sdk.subscription.SubscribeType; -import io.kubemq.sdk.tools.Converter; +import io.kubemq.sdk.pubsub.PubSubClient; +import io.kubemq.sdk.pubsub.EventsStoreSubscription; +import io.kubemq.sdk.pubsub.EventsStoreType; import jakarta.annotation.PostConstruct; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -16,54 +11,48 @@ import pl.piomin.service.kubemq.model.Order; import pl.piomin.service.kubemq.repository.AccountRepository; -import javax.net.ssl.SSLException; -import java.io.IOException; +import com.fasterxml.jackson.databind.ObjectMapper; @Component -public class TransactionAmountListener implements StreamObserver { +public class TransactionAmountListener { private static final Logger LOGGER = LoggerFactory.getLogger(TransactionAmountListener.class); + private static final ObjectMapper objectMapper = new ObjectMapper(); - private Subscriber subscriber; + private PubSubClient pubSubClient; private AccountRepository accountRepository; - public TransactionAmountListener(Subscriber subscriber, AccountRepository accountRepository) { - this.subscriber = subscriber; + public TransactionAmountListener(PubSubClient pubSubClient, AccountRepository accountRepository) { + this.pubSubClient = pubSubClient; this.accountRepository = accountRepository; } - @Override - public void onNext(EventReceive eventReceive) { - try { - Order order = (Order) Converter.FromByteArray(eventReceive.getBody()); - LOGGER.info("Amount event: {}", order); - accountRepository.updateBalance(order.getAccountIdTo(), (int) (order.getAmount() * 0.1)); - } catch (IOException | ClassNotFoundException | InsufficientFundsException e) { - LOGGER.error("Error", e); - } - } - - @Override - public void onError(Throwable throwable) { - - } - - @Override - public void onCompleted() { - - } - @PostConstruct public void init() { - SubscribeRequest subscribeRequest = new SubscribeRequest(); - subscribeRequest.setChannel("transactions"); - subscribeRequest.setClientID("amount-listener"); - subscribeRequest.setSubscribeType(SubscribeType.EventsStore); - subscribeRequest.setEventsStoreType(EventsStoreType.StartNewOnly); try { - subscriber.SubscribeToEvents(subscribeRequest, this); - } catch (ServerAddressNotSuppliedException | SSLException e) { - e.printStackTrace(); + EventsStoreSubscription subscription = EventsStoreSubscription.builder() + .channel("transactions") + .group("") + .eventsStoreType(EventsStoreType.StartNewOnly) + .onReceiveEventCallback(event -> { + try { + Order order = objectMapper.readValue(event.getBody(), Order.class); + LOGGER.info("Amount event: {}", order); + accountRepository.updateBalance(order.getAccountIdTo(), (int) (order.getAmount() * 0.1)); + } catch (InsufficientFundsException e) { + LOGGER.error("Error", e); + } catch (Exception e) { + LOGGER.error("Error processing event", e); + } + }) + .onErrorCallback(error -> { + LOGGER.error("Subscription error", error); + }) + .build(); + + pubSubClient.subscribeToEventsStore(subscription); + } catch (Exception e) { + LOGGER.error("Error initializing subscription", e); } } } diff --git a/src/main/java/pl/piomin/service/kubemq/listener/TransactionCountListener.java b/src/main/java/pl/piomin/service/kubemq/listener/TransactionCountListener.java index 78b46b1..d73acfd 100644 --- a/src/main/java/pl/piomin/service/kubemq/listener/TransactionCountListener.java +++ b/src/main/java/pl/piomin/service/kubemq/listener/TransactionCountListener.java @@ -1,13 +1,8 @@ package pl.piomin.service.kubemq.listener; -import io.grpc.stub.StreamObserver; -import io.kubemq.sdk.basic.ServerAddressNotSuppliedException; -import io.kubemq.sdk.event.EventReceive; -import io.kubemq.sdk.event.Subscriber; -import io.kubemq.sdk.subscription.EventsStoreType; -import io.kubemq.sdk.subscription.SubscribeRequest; -import io.kubemq.sdk.subscription.SubscribeType; -import io.kubemq.sdk.tools.Converter; +import io.kubemq.sdk.pubsub.PubSubClient; +import io.kubemq.sdk.pubsub.EventsStoreSubscription; +import io.kubemq.sdk.pubsub.EventsStoreType; import jakarta.annotation.PostConstruct; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -16,67 +11,61 @@ import pl.piomin.service.kubemq.model.Order; import pl.piomin.service.kubemq.repository.AccountRepository; -import javax.net.ssl.SSLException; -import java.io.IOException; +import com.fasterxml.jackson.databind.ObjectMapper; import java.util.HashMap; import java.util.Map; @Component -public class TransactionCountListener implements StreamObserver { +public class TransactionCountListener { private static final Logger LOGGER = LoggerFactory.getLogger(TransactionCountListener.class); + private static final ObjectMapper objectMapper = new ObjectMapper(); private Map transactionsCount = new HashMap<>(); - private Subscriber subscriber; + private PubSubClient pubSubClient; private AccountRepository accountRepository; - public TransactionCountListener(Subscriber subscriber, AccountRepository accountRepository) { - this.subscriber = subscriber; + public TransactionCountListener(PubSubClient pubSubClient, AccountRepository accountRepository) { + this.pubSubClient = pubSubClient; this.accountRepository = accountRepository; } - @Override - public void onNext(EventReceive eventReceive) { - try { - Order order = (Order) Converter.FromByteArray(eventReceive.getBody()); - LOGGER.info("Count event: {}", order); - Integer accountIdTo = order.getAccountIdTo(); - Integer noOfTransactions = transactionsCount.get(accountIdTo); - if (noOfTransactions == null) - transactionsCount.put(accountIdTo, 1); - else { - transactionsCount.put(accountIdTo, ++noOfTransactions); - if (noOfTransactions > 5) { - accountRepository.updateBalance(order.getAccountIdTo(), (int) (order.getAmount() * 0.1)); - LOGGER.info("Adding extra to: id={}", order.getAccountIdTo()); - } - } - } catch (IOException | ClassNotFoundException | InsufficientFundsException e) { - LOGGER.error("Error", e); - } - } - - @Override - public void onError(Throwable throwable) { - - } - - @Override - public void onCompleted() { - - } - @PostConstruct public void init() { - final SubscribeRequest subscribeRequest = new SubscribeRequest(); - subscribeRequest.setChannel("transactions"); - subscribeRequest.setClientID("count-listener-" + System.currentTimeMillis()); - subscribeRequest.setSubscribeType(SubscribeType.EventsStore); - subscribeRequest.setEventsStoreType(EventsStoreType.StartFromFirst); try { - subscriber.SubscribeToEvents(subscribeRequest, this); - } catch (ServerAddressNotSuppliedException | SSLException e) { - e.printStackTrace(); + EventsStoreSubscription subscription = EventsStoreSubscription.builder() + .channel("transactions") + .group("") + .eventsStoreType(EventsStoreType.StartFromFirst) + .onReceiveEventCallback(event -> { + try { + Order order = objectMapper.readValue(event.getBody(), Order.class); + LOGGER.info("Count event: {}", order); + Integer accountIdTo = order.getAccountIdTo(); + Integer noOfTransactions = transactionsCount.get(accountIdTo); + if (noOfTransactions == null) + transactionsCount.put(accountIdTo, 1); + else { + transactionsCount.put(accountIdTo, ++noOfTransactions); + if (noOfTransactions > 5) { + accountRepository.updateBalance(order.getAccountIdTo(), (int) (order.getAmount() * 0.1)); + LOGGER.info("Adding extra to: id={}", order.getAccountIdTo()); + } + } + } catch (InsufficientFundsException e) { + LOGGER.error("Error", e); + } catch (Exception e) { + LOGGER.error("Error processing event", e); + } + }) + .onErrorCallback(error -> { + LOGGER.error("Subscription error", error); + }) + .build(); + + pubSubClient.subscribeToEventsStore(subscription); + } catch (Exception e) { + LOGGER.error("Error initializing subscription", e); } } diff --git a/src/main/java/pl/piomin/service/kubemq/model/Account.java b/src/main/java/pl/piomin/service/kubemq/model/Account.java index 23ef793..7fe9332 100644 --- a/src/main/java/pl/piomin/service/kubemq/model/Account.java +++ b/src/main/java/pl/piomin/service/kubemq/model/Account.java @@ -1,22 +1,65 @@ package pl.piomin.service.kubemq.model; -import lombok.AllArgsConstructor; -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.NoArgsConstructor; -import lombok.Setter; -import lombok.ToString; - -@Getter -@Setter -@NoArgsConstructor -@AllArgsConstructor -@EqualsAndHashCode -@ToString +import java.util.Objects; + public class Account { private Integer id; private String number; - @EqualsAndHashCode.Exclude private int balance; + private int balance; + + public Account() { + } + + public Account(Integer id, String number, int balance) { + this.id = id; + this.number = number; + this.balance = balance; + } + + public Integer getId() { + return id; + } + + public void setId(Integer id) { + this.id = id; + } + + public String getNumber() { + return number; + } + + public void setNumber(String number) { + this.number = number; + } + + public int getBalance() { + return balance; + } + + public void setBalance(int balance) { + this.balance = balance; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Account account = (Account) o; + return Objects.equals(id, account.id) && Objects.equals(number, account.number); + } + + @Override + public int hashCode() { + return Objects.hash(id, number); + } + @Override + public String toString() { + return "Account{" + + "id=" + id + + ", number='" + number + '\'' + + ", balance=" + balance + + '}'; + } } diff --git a/src/main/java/pl/piomin/service/kubemq/model/Order.java b/src/main/java/pl/piomin/service/kubemq/model/Order.java index fb3b961..71a97de 100644 --- a/src/main/java/pl/piomin/service/kubemq/model/Order.java +++ b/src/main/java/pl/piomin/service/kubemq/model/Order.java @@ -1,17 +1,8 @@ package pl.piomin.service.kubemq.model; -import lombok.Builder; -import lombok.Getter; -import lombok.Setter; -import lombok.ToString; - import java.io.Serializable; import java.time.LocalDateTime; -@Getter -@Setter -@Builder -@ToString public class Order implements Serializable { private OrderType type; @@ -22,4 +13,85 @@ public class Order implements Serializable { private String id; private OrderStatus status; + public Order() { + } + + public Order(OrderType type, Integer accountIdFrom, Integer accountIdTo, LocalDateTime date, int amount, String id, OrderStatus status) { + this.type = type; + this.accountIdFrom = accountIdFrom; + this.accountIdTo = accountIdTo; + this.date = date; + this.amount = amount; + this.id = id; + this.status = status; + } + + public OrderType getType() { + return type; + } + + public void setType(OrderType type) { + this.type = type; + } + + public Integer getAccountIdFrom() { + return accountIdFrom; + } + + public void setAccountIdFrom(Integer accountIdFrom) { + this.accountIdFrom = accountIdFrom; + } + + public Integer getAccountIdTo() { + return accountIdTo; + } + + public void setAccountIdTo(Integer accountIdTo) { + this.accountIdTo = accountIdTo; + } + + public LocalDateTime getDate() { + return date; + } + + public void setDate(LocalDateTime date) { + this.date = date; + } + + public int getAmount() { + return amount; + } + + public void setAmount(int amount) { + this.amount = amount; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public OrderStatus getStatus() { + return status; + } + + public void setStatus(OrderStatus status) { + this.status = status; + } + + @Override + public String toString() { + return "Order{" + + "type=" + type + + ", accountIdFrom=" + accountIdFrom + + ", accountIdTo=" + accountIdTo + + ", date=" + date + + ", amount=" + amount + + ", id='" + id + '\'' + + ", status=" + status + + '}'; + } } diff --git a/src/test/java/pl/piomin/service/kubemq/KubeMQIntegrationTest.java b/src/test/java/pl/piomin/service/kubemq/KubeMQIntegrationTest.java new file mode 100644 index 0000000..c0b216e --- /dev/null +++ b/src/test/java/pl/piomin/service/kubemq/KubeMQIntegrationTest.java @@ -0,0 +1,290 @@ +package pl.piomin.service.kubemq; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import io.kubemq.sdk.pubsub.EventMessage; +import io.kubemq.sdk.pubsub.EventSendResult; +import io.kubemq.sdk.pubsub.PubSubClient; +import io.kubemq.sdk.queues.*; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import pl.piomin.service.kubemq.model.Order; +import pl.piomin.service.kubemq.model.OrderStatus; +import pl.piomin.service.kubemq.model.OrderType; + +import java.time.LocalDateTime; + +import static org.junit.jupiter.api.Assertions.*; + +@Testcontainers +class KubeMQIntegrationTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(KubeMQIntegrationTest.class); + private static final ObjectMapper objectMapper = new ObjectMapper() + .registerModule(new JavaTimeModule()) + .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + + @Container + static GenericContainer kubemqContainer = new GenericContainer<>("kubemq/kubemq-community:latest") + .withExposedPorts(50000, 8080, 9090) + .withEnv("KUBEMQ_TOKEN", "") + .withEnv("KUBEMQ_LICENSE", "") + .waitingFor(Wait.forListeningPort()) + .withStartupTimeout(java.time.Duration.ofMinutes(2)); + + private QueuesClient queuesClient; + private PubSubClient pubSubClient; + private String kubemqAddress; + + @BeforeEach + void setUp() { + kubemqAddress = kubemqContainer.getHost() + ":" + kubemqContainer.getMappedPort(50000); + LOGGER.info("KubeMQ container started at: {}", kubemqAddress); + + queuesClient = QueuesClient.builder() + .address(kubemqAddress) + .clientId("test-queues-client") + .build(); + + pubSubClient = PubSubClient.builder() + .address(kubemqAddress) + .clientId("test-pubsub-client") + .build(); + } + + @AfterEach + void tearDown() { + if (queuesClient != null) { + try { + queuesClient.close(); + } catch (Exception e) { + LOGGER.warn("Error closing queues client", e); + } + } + if (pubSubClient != null) { + try { + pubSubClient.close(); + } catch (Exception e) { + LOGGER.warn("Error closing pubsub client", e); + } + } + } + + @Test + void shouldSendAndReceiveQueueMessage() throws Exception { + // Given + String testChannel = "test-transactions"; + Order testOrder = new Order( + OrderType.TRANSFER, + 1, + 2, + LocalDateTime.now(), + 100, + null, + OrderStatus.ACCEPTED + ); + + byte[] orderBytes = objectMapper.writeValueAsBytes(testOrder); + + // When - Send message + QueueMessage message = QueueMessage.builder() + .channel(testChannel) + .body(orderBytes) + .metadata("test-message") + .build(); + + QueueSendResult sendResult = queuesClient.sendQueuesMessage(message); + + // Then - Verify send + assertNotNull(sendResult); + assertNotNull(sendResult.getId()); + assertFalse(sendResult.isError()); + LOGGER.info("Message sent with ID: {}", sendResult.getId()); + + // When - Receive message + QueuesPollRequest pollRequest = QueuesPollRequest.builder() + .channel(testChannel) + .pollMaxMessages(1) + .pollWaitTimeoutInSeconds(10) + .build(); + + QueuesPollResponse pollResponse = queuesClient.receiveQueuesMessages(pollRequest); + + // Then - Verify receive + assertNotNull(pollResponse); + assertFalse(pollResponse.isError()); + assertFalse(pollResponse.getMessages().isEmpty()); + + QueueMessageReceived receivedMessage = pollResponse.getMessages().get(0); + assertNotNull(receivedMessage); + assertNotNull(receivedMessage.getBody()); + + Order receivedOrder = objectMapper.readValue(receivedMessage.getBody(), Order.class); + assertEquals(testOrder.getAccountIdFrom(), receivedOrder.getAccountIdFrom()); + assertEquals(testOrder.getAccountIdTo(), receivedOrder.getAccountIdTo()); + assertEquals(testOrder.getAmount(), receivedOrder.getAmount()); + assertEquals(testOrder.getType(), receivedOrder.getType()); + + LOGGER.info("Message received: {}", receivedOrder); + + // Acknowledge the message + receivedMessage.ack(); + } + + @Test + void shouldSendAndReceiveMultipleQueueMessages() throws Exception { + // Given + String testChannel = "test-bulk-transactions"; + int messageCount = 5; + + // When - Send multiple messages + for (int i = 0; i < messageCount; i++) { + Order order = new Order( + OrderType.TRANSFER, + 1, + 2, + LocalDateTime.now(), + 100 + i, + null, + OrderStatus.ACCEPTED + ); + + byte[] orderBytes = objectMapper.writeValueAsBytes(order); + QueueMessage message = QueueMessage.builder() + .channel(testChannel) + .body(orderBytes) + .build(); + + QueueSendResult result = queuesClient.sendQueuesMessage(message); + assertFalse(result.isError()); + } + + LOGGER.info("Sent {} messages", messageCount); + + // When - Receive messages + QueuesPollRequest pollRequest = QueuesPollRequest.builder() + .channel(testChannel) + .pollMaxMessages(messageCount) + .pollWaitTimeoutInSeconds(10) + .build(); + + QueuesPollResponse pollResponse = queuesClient.receiveQueuesMessages(pollRequest); + + // Then + assertNotNull(pollResponse); + assertFalse(pollResponse.isError()); + assertEquals(messageCount, pollResponse.getMessages().size()); + + LOGGER.info("Received {} messages", pollResponse.getMessages().size()); + + // Acknowledge all messages + for (QueueMessageReceived msg : pollResponse.getMessages()) { + msg.ack(); + } + } + + @Test + void shouldSendEventMessage() throws Exception { + // Given + String testChannel = "test-events"; + Order testOrder = new Order( + OrderType.TRANSFER, + 1, + 2, + LocalDateTime.now(), + 200, + "test-event-id", + OrderStatus.CONFIRMED + ); + + byte[] orderBytes = objectMapper.writeValueAsBytes(testOrder); + + // When + EventMessage eventMessage = EventMessage.builder() + .channel(testChannel) + .body(orderBytes) + .metadata("test-event") + .build(); + + // Send event - method returns void in SDK v2 + pubSubClient.sendEventsMessage(eventMessage); + + // Then - If no exception thrown, the event was sent successfully + LOGGER.info("Event sent successfully to channel: {}", testChannel); + } + + @Test + void shouldRejectQueueMessage() throws Exception { + // Given + String testChannel = "test-reject"; + Order testOrder = new Order( + OrderType.TRANSFER, + 1, + 2, + LocalDateTime.now(), + 300, + null, + OrderStatus.ACCEPTED + ); + + byte[] orderBytes = objectMapper.writeValueAsBytes(testOrder); + + // When - Send message + QueueMessage message = QueueMessage.builder() + .channel(testChannel) + .body(orderBytes) + .build(); + + queuesClient.sendQueuesMessage(message); + + // Receive message + QueuesPollRequest pollRequest = QueuesPollRequest.builder() + .channel(testChannel) + .pollMaxMessages(1) + .pollWaitTimeoutInSeconds(10) + .build(); + + QueuesPollResponse pollResponse = queuesClient.receiveQueuesMessages(pollRequest); + assertFalse(pollResponse.getMessages().isEmpty()); + + QueueMessageReceived receivedMessage = pollResponse.getMessages().get(0); + + // Reject the message + receivedMessage.reject(); + LOGGER.info("Message rejected successfully"); + + // The message should be available again after rejection + QueuesPollResponse secondPollResponse = queuesClient.receiveQueuesMessages(pollRequest); + assertFalse(secondPollResponse.isError()); + // Note: Depending on KubeMQ configuration, the rejected message might be re-queued + } + + @Test + void shouldHandleEmptyQueuePoll() throws Exception { + // Given + String emptyChannel = "empty-channel-" + System.currentTimeMillis(); + + // When + QueuesPollRequest pollRequest = QueuesPollRequest.builder() + .channel(emptyChannel) + .pollMaxMessages(1) + .pollWaitTimeoutInSeconds(2) + .build(); + + QueuesPollResponse pollResponse = queuesClient.receiveQueuesMessages(pollRequest); + + // Then + assertNotNull(pollResponse); + assertFalse(pollResponse.isError()); + assertTrue(pollResponse.getMessages().isEmpty()); + LOGGER.info("Empty queue poll handled correctly"); + } +}