Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@ jobs:
- checkout
- run:
name: Analyze on SonarCloud
command: mvn verify sonar:sonar
command: mvn verify sonar:sonar -DskipTests

executors:
jdk:
docker:
- image: 'cimg/openjdk:21.0.9'
vm:
machine:
image: ubuntu-2204:current
environment:
architecture: "amd64"
platform: "linux/amd64"

orbs:
maven: circleci/maven@2.1.1
Expand All @@ -22,6 +25,6 @@ workflows:
maven_test:
jobs:
- maven/test:
executor: jdk
executor: vm
- build:
context: SonarCloud
28 changes: 22 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,42 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.42</version>
</dependency>
<dependency>
<groupId>io.kubemq.sdk</groupId>
<artifactId>kubemq-sdk-Java</artifactId>
<version>1.0.5</version>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>1.79.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>1.20.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>1.20.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,35 +1,31 @@
package pl.piomin.service.kubemq.config;

import io.kubemq.sdk.basic.ServerAddressNotSuppliedException;
import io.kubemq.sdk.event.Channel;
import io.kubemq.sdk.event.Subscriber;
import io.kubemq.sdk.queue.Queue;

import io.kubemq.sdk.pubsub.PubSubClient;
import io.kubemq.sdk.queues.QueuesClient;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.net.ssl.SSLException;

@Configuration
@ConfigurationProperties("kubemq")
public class KubeMQConfiguration {

private String address;

@Bean
public Queue queue() throws ServerAddressNotSuppliedException, SSLException {
return new Queue("transactions", "orders", address);
}

@Bean
public Subscriber subscriber() {
return new Subscriber(address);
public QueuesClient queuesClient() {
return QueuesClient.builder()
.address(address)
.clientId("orders-service-queues")
.build();
}

@Bean
public Channel channel() {
return new Channel("transactions", "orders", true, address);
public PubSubClient pubSubClient() {
return PubSubClient.builder()
.address(address)
.clientId("orders-service-pubsub")
.build();
}

String getAddress() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package pl.piomin.service.kubemq.controller;

import io.kubemq.sdk.basic.ServerAddressNotSuppliedException;
import io.kubemq.sdk.queue.Message;
import io.kubemq.sdk.queue.Queue;
import io.kubemq.sdk.queue.SendMessageResult;
import io.kubemq.sdk.tools.Converter;
import io.kubemq.sdk.queues.QueuesClient;
import io.kubemq.sdk.queues.QueueMessage;
import io.kubemq.sdk.queues.QueueSendResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.PostMapping;
Expand All @@ -14,30 +12,37 @@
import pl.piomin.service.kubemq.model.Order;
import pl.piomin.service.kubemq.model.OrderStatus;

import java.io.IOException;
import com.fasterxml.jackson.databind.ObjectMapper;

@RestController
@RequestMapping("/orders")
public class OrderController {

private static final Logger LOGGER = LoggerFactory.getLogger(OrderController.class);
private static final ObjectMapper objectMapper = new ObjectMapper();

private Queue queue;
private QueuesClient queuesClient;

public OrderController(Queue queue) {
this.queue = queue;
public OrderController(QueuesClient queuesClient) {
this.queuesClient = queuesClient;
}

@PostMapping
public Order sendOrder(@RequestBody Order order) {
try {
LOGGER.info("Sending: {}", order);
final SendMessageResult result = queue.SendQueueMessage(new Message()
.setBody(Converter.ToByteArray(order)));
order.setId(result.getMessageID());
byte[] orderBytes = objectMapper.writeValueAsBytes(order);

QueueMessage message = QueueMessage.builder()
.channel("transactions")
.body(orderBytes)
.build();

QueueSendResult result = queuesClient.sendQueuesMessage(message);
order.setId(result.getId());
order.setStatus(OrderStatus.ACCEPTED);
LOGGER.info("Sent: {}", order);
} catch (ServerAddressNotSuppliedException | IOException e) {
} catch (Exception e) {
LOGGER.error("Error sending", e);
order.setStatus(OrderStatus.ERROR);
}
Expand Down
69 changes: 46 additions & 23 deletions src/main/java/pl/piomin/service/kubemq/listener/OrderListener.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package pl.piomin.service.kubemq.listener;

import io.kubemq.sdk.event.Channel;
import io.kubemq.sdk.event.Event;
import io.kubemq.sdk.queue.Queue;
import io.kubemq.sdk.queue.Transaction;
import io.kubemq.sdk.queue.TransactionMessagesResponse;
import io.kubemq.sdk.tools.Converter;
import io.kubemq.sdk.pubsub.PubSubClient;
import io.kubemq.sdk.pubsub.EventMessage;
import io.kubemq.sdk.queues.QueuesClient;
import io.kubemq.sdk.queues.QueuesPollRequest;
import io.kubemq.sdk.queues.QueuesPollResponse;
import io.kubemq.sdk.queues.QueueMessageReceived;
import jakarta.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -15,19 +15,23 @@
import pl.piomin.service.kubemq.model.OrderStatus;
import pl.piomin.service.kubemq.service.OrderProcessor;

import com.fasterxml.jackson.databind.ObjectMapper;

@Component
public class OrderListener {

private static final Logger LOGGER = LoggerFactory.getLogger(OrderListener.class);
private static final ObjectMapper objectMapper = new ObjectMapper();

private Queue queue;
private Channel channel;
private QueuesClient queuesClient;
private PubSubClient pubSubClient;
private OrderProcessor orderProcessor;
private TaskExecutor taskExecutor;

public OrderListener(Queue queue, Channel channel, OrderProcessor orderProcessor, TaskExecutor taskExecutor) {
this.queue = queue;
this.channel = channel;
public OrderListener(QueuesClient queuesClient, PubSubClient pubSubClient,
OrderProcessor orderProcessor, TaskExecutor taskExecutor) {
this.queuesClient = queuesClient;
this.pubSubClient = pubSubClient;
this.orderProcessor = orderProcessor;
this.taskExecutor = taskExecutor;
}
Expand All @@ -37,21 +41,40 @@ public void listen() {
taskExecutor.execute(() -> {
while (true) {
try {
Transaction transaction = queue.CreateTransaction();
TransactionMessagesResponse response = transaction.Receive(10, 10);
if (response.getMessage().getBody().length > 0) {
Order order = orderProcessor
.process((Order) Converter.FromByteArray(response.getMessage().getBody()));
QueuesPollRequest pollRequest = QueuesPollRequest.builder()
.channel("transactions")
.pollMaxMessages(1)
.pollWaitTimeoutInSeconds(10)
.build();

QueuesPollResponse response = queuesClient.receiveQueuesMessages(pollRequest);

if (response.isError()) {
LOGGER.error("Error receiving message: {}", response.getError());
Thread.sleep(10000);
continue;
}

if (!response.getMessages().isEmpty()) {
QueueMessageReceived message = response.getMessages().get(0);
Order order = objectMapper.readValue(message.getBody(), Order.class);
order = orderProcessor.process(order);
LOGGER.info("Processed: {}", order);

if (order.getStatus().equals(OrderStatus.CONFIRMED)) {
transaction.AckMessage();
Event event = new Event();
event.setEventId(response.getMessage().getMessageID());
event.setBody(Converter.ToByteArray(order));
LOGGER.info("Sending event: id={}", event.getEventId());
channel.SendEvent(event);
message.ack();

byte[] eventBody = objectMapper.writeValueAsBytes(order);
EventMessage event = EventMessage.builder()
.channel("transactions")
.body(eventBody)
.id(message.getId())
.build();

LOGGER.info("Sending event: id={}", message.getId());
pubSubClient.sendEventsMessage(event);
} else {
transaction.RejectMessage();
message.reject();
}
} else {
LOGGER.info("No messages");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
package pl.piomin.service.kubemq.listener;

import io.grpc.stub.StreamObserver;
import io.kubemq.sdk.basic.ServerAddressNotSuppliedException;
import io.kubemq.sdk.event.EventReceive;
import io.kubemq.sdk.event.Subscriber;
import io.kubemq.sdk.subscription.EventsStoreType;
import io.kubemq.sdk.subscription.SubscribeRequest;
import io.kubemq.sdk.subscription.SubscribeType;
import io.kubemq.sdk.tools.Converter;
import io.kubemq.sdk.pubsub.PubSubClient;
import io.kubemq.sdk.pubsub.EventsStoreSubscription;
import io.kubemq.sdk.pubsub.EventsStoreType;
import jakarta.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -16,54 +11,48 @@
import pl.piomin.service.kubemq.model.Order;
import pl.piomin.service.kubemq.repository.AccountRepository;

import javax.net.ssl.SSLException;
import java.io.IOException;
import com.fasterxml.jackson.databind.ObjectMapper;

@Component
public class TransactionAmountListener implements StreamObserver<EventReceive> {
public class TransactionAmountListener {

private static final Logger LOGGER = LoggerFactory.getLogger(TransactionAmountListener.class);
private static final ObjectMapper objectMapper = new ObjectMapper();

private Subscriber subscriber;
private PubSubClient pubSubClient;
private AccountRepository accountRepository;

public TransactionAmountListener(Subscriber subscriber, AccountRepository accountRepository) {
this.subscriber = subscriber;
public TransactionAmountListener(PubSubClient pubSubClient, AccountRepository accountRepository) {
this.pubSubClient = pubSubClient;
this.accountRepository = accountRepository;
}

@Override
public void onNext(EventReceive eventReceive) {
try {
Order order = (Order) Converter.FromByteArray(eventReceive.getBody());
LOGGER.info("Amount event: {}", order);
accountRepository.updateBalance(order.getAccountIdTo(), (int) (order.getAmount() * 0.1));
} catch (IOException | ClassNotFoundException | InsufficientFundsException e) {
LOGGER.error("Error", e);
}
}

@Override
public void onError(Throwable throwable) {

}

@Override
public void onCompleted() {

}

@PostConstruct
public void init() {
SubscribeRequest subscribeRequest = new SubscribeRequest();
subscribeRequest.setChannel("transactions");
subscribeRequest.setClientID("amount-listener");
subscribeRequest.setSubscribeType(SubscribeType.EventsStore);
subscribeRequest.setEventsStoreType(EventsStoreType.StartNewOnly);
try {
subscriber.SubscribeToEvents(subscribeRequest, this);
} catch (ServerAddressNotSuppliedException | SSLException e) {
e.printStackTrace();
EventsStoreSubscription subscription = EventsStoreSubscription.builder()
.channel("transactions")
.group("")
.eventsStoreType(EventsStoreType.StartNewOnly)
.onReceiveEventCallback(event -> {
try {
Order order = objectMapper.readValue(event.getBody(), Order.class);
LOGGER.info("Amount event: {}", order);
accountRepository.updateBalance(order.getAccountIdTo(), (int) (order.getAmount() * 0.1));
} catch (InsufficientFundsException e) {
LOGGER.error("Error", e);
} catch (Exception e) {
LOGGER.error("Error processing event", e);
}
})
.onErrorCallback(error -> {
LOGGER.error("Subscription error", error);
})
.build();

pubSubClient.subscribeToEventsStore(subscription);
} catch (Exception e) {
LOGGER.error("Error initializing subscription", e);
}
}
}
Loading