diff --git a/Dockerfile b/Dockerfile
index 76cecc5..110287e 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -1,7 +1,5 @@
-FROM adoptopenjdk/openjdk11:jdk-11.0.2.9-slim
+FROM adoptopenjdk/openjdk11:jre-11.0.8_10-alpine
WORKDIR /opt
-ENV PORT 8080
-EXPOSE 8080
COPY kafka.client.truststore.jks /opt/kafka.client.truststore.jks
COPY target/*.jar /opt/app.jar
-ENTRYPOINT exec java $JAVA_OPTS -jar app.jar
\ No newline at end of file
+ENTRYPOINT exec java $JAVA_OPTS -jar app.jar --spring.profiles.active=aws --spring.config.location=/tmp/
\ No newline at end of file
diff --git a/README.md b/README.md
deleted file mode 100644
index a4588e3..0000000
--- a/README.md
+++ /dev/null
@@ -1,26 +0,0 @@
-# data-ingestion
-
-bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 10 --topic ingestion
-
-bin/kafka-console-consumer.sh --topic ingestion --from-beginning --bootstrap-server localhost:9092
-
-docker run --rm -e ADV_HOST=localhost -p 3030:3030 -p 8082:8082 -p 9092:9092 -p 2181:2181 -p 8081:8081 -e RUNNING_SAMPLEDATA=0 -e RUNTESTS=0 -e CONNECT_PORT=0 lensesio/fast-data-dev
-
-docker run -d --name es762 -p 9200:9200 -e "discovery.type=single-node" elasticsearch:7.6.2
-
-curl -X POST "http://localhost:9200/extrato/_search" -d'{"query":{"query_string":{"query": "8bfca9fa-ecdf-4f18-bfe3-e7f3c0360a40"}}}' -H 'Content-type:application/json'
-
-
-curl -X POST "http://localhost:9200/extrato/_search" -d'{"query":{"query_string":{"query": "946eccbc-a87b-4aa5-b856-a13ca34ced98"}}}' -H 'Content-type:application/json'
-
-docker run -d --name grafana -p 3000:3000 grafana/grafana
-
-docker run -d --name prometheus -p 9090:9090 -v /home/capiau3/Desktop/Projetos/prometheus.yml:/etc/prometheus/prometheus.yml prom/prometheus --config.file=/etc/prometheus/prometheus.yml
-
-ifconfig -a
-
-curl -X POST "http://localhost:9200/extrato/_search?pretty"
-
-bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group data-ingestion
-
-curl localhost:8080/geraevento/10000/1000000
\ No newline at end of file
diff --git a/load-generate.yaml b/load-generate.yaml
new file mode 100644
index 0000000..cc3a02e
--- /dev/null
+++ b/load-generate.yaml
@@ -0,0 +1,61 @@
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+ name: load-generate
+ labels:
+ app: load-generate
+ namespace: load-generate
+spec:
+ replicas: 0
+ selector:
+ matchLabels:
+ app: load-generate
+ template:
+ metadata:
+ labels:
+ app: load-generate
+ spec:
+ serviceAccountName: statement-demo
+ containers:
+ - name: load-generate
+ image: 837108680928.dkr.ecr.us-east-1.amazonaws.com/load-generate:latest
+ imagePullPolicy: Always
+ volumeMounts:
+ - mountPath: /tmp/application-aws.properties
+ name: config
+ subPath: application-aws.properties
+ resources:
+ limits:
+ memory: "3Gi"
+ cpu: 2
+ requests:
+ memory: "3Gi"
+ cpu: 2
+ env:
+ - name: JAVA_OPTS
+ value: "-XX:MaxRAMPercentage=90.0"
+ - name: KAFKA_BOOTSTRAP
+ valueFrom:
+ configMapKeyRef:
+ name: kafka
+ key: bootstrap
+ - name: KAFKA_GROUP_ID
+ value: "load-generate"
+ - name: KAFKA_MAX_POOL_RECORDS
+ value: "1"
+ - name: KAFKA_CONCURRENCY
+ value: "10"
+ - name: KAFKA_PRODUCER_TOPIC
+ value: "generate"
+ - name: KAFKA_CONSUMER_TOPIC
+ value: "load"
+ - name: CLOUDWATCH_NAMESPACE
+ value: "statement12"
+ - name: POD_NAME
+ valueFrom:
+ fieldRef:
+ fieldPath: metadata.name
+ volumes:
+ - configMap:
+ name: application.properties
+ name: config
diff --git a/pom.xml b/pom.xml
index be6ec43..2611651 100644
--- a/pom.xml
+++ b/pom.xml
@@ -15,7 +15,8 @@
Demo project for Spring Boot
- 1.8
+ 11
+ 1.11.887
@@ -23,15 +24,10 @@
org.springframework.boot
spring-boot-starter-actuator
-
- org.springframework.boot
- spring-boot-starter-web
-
org.springframework.kafka
spring-kafka
-
org.projectlombok
lombok
@@ -42,50 +38,71 @@
commons-lang3
3.11
-
- io.micrometer
- micrometer-registry-prometheus
-
com.github.javafaker
javafaker
1.0.2
-
- org.springframework.boot
- spring-boot-starter-test
- test
-
-
- org.junit.vintage
- junit-vintage-engine
-
-
-
org.springframework.kafka
spring-kafka-test
test
-
- junit
- junit
- test
-
- junit
- junit
- test
+ io.micrometer
+ micrometer-core
+ 1.5.5
+
+
+ com.amazonaws
+ aws-java-sdk-bom
+ ${aws.sdk.version}
+ pom
+
+
+ com.amazonaws
+ aws-java-sdk-sts
+ ${aws.sdk.version}
+
+
+ com.amazonaws
+ aws-java-sdk-cloudwatch
+ ${aws.sdk.version}
+
+
+ org.springframework.cloud
+ spring-cloud-starter-aws
+ 2.2.4.RELEASE
+
+
+ io.micrometer
+ micrometer-registry-cloudwatch
+ 1.5.5
-
-
-
- org.springframework.boot
- spring-boot-maven-plugin
-
-
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+ **/application*.properties
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+
+
+ repackage
+
+
+
+
+
diff --git a/prometheus.yml b/prometheus.yml
deleted file mode 100644
index f19aa03..0000000
--- a/prometheus.yml
+++ /dev/null
@@ -1,26 +0,0 @@
-# my global config
-global:
- scrape_interval: 15s # Set the scrape interval to every 15 seconds. Default is every 1 minute.
- evaluation_interval: 15s # Evaluate rules every 15 seconds. The default is every 1 minute.
- # scrape_timeout is set to the global default (10s).
-
-# Load rules once and periodically evaluate them according to the global 'evaluation_interval'.
-rule_files:
- # - "first_rules.yml"
- # - "second_rules.yml"
-
-# A scrape configuration containing exactly one endpoint to scrape:
-# Here it's Prometheus itself.
-scrape_configs:
- # The job name is added as a label `job=` to any timeseries scraped from this config.
- - job_name: 'prometheus'
- # metrics_path defaults to '/metrics'
- # scheme defaults to 'http'.
- static_configs:
- - targets: ['127.0.0.1:9090']
-
- - job_name: 'spring-actuator'
- metrics_path: '/actuator/prometheus'
- scrape_interval: 5s
- static_configs:
- - targets: ['192.168.100.123:8080']
diff --git a/src/main/java/br/com/exemplo/dataingestion/LoadGenerateApplication.java b/src/main/java/br/com/exemplo/dataingestion/LoadGenerateApplication.java
index 9ca4741..b9635c6 100644
--- a/src/main/java/br/com/exemplo/dataingestion/LoadGenerateApplication.java
+++ b/src/main/java/br/com/exemplo/dataingestion/LoadGenerateApplication.java
@@ -1,42 +1,14 @@
package br.com.exemplo.dataingestion;
-import br.com.exemplo.dataingestion.adapters.controllers.servers.GenerateLoadController;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.context.event.ApplicationReadyEvent;
-import org.springframework.context.event.EventListener;
-import javax.annotation.PostConstruct;
+import lombok.RequiredArgsConstructor;
@SpringBootApplication
@RequiredArgsConstructor
-@Slf4j
public class LoadGenerateApplication {
-
- @Value("${registros.total:1000000}")
- private int registro;
-
- @Value("${contas.total:1000}")
- private int contas;
-
- @Value("${dias.total:90}")
- private int dias;
-
- private final GenerateLoadController generateLoadController;
-
public static void main(String[] args) {
SpringApplication.run(LoadGenerateApplication.class, args);
}
-
-
- @EventListener(ApplicationReadyEvent.class)
- public void run() throws Exception {
- log.info("Iniciando a produção de {} com {} contas e com {} dias retroativos",registro,contas,dias);
- generateLoadController.geraEvento(contas,registro,dias);
- log.info("Liberando comando da aplicação");
- }
}
diff --git a/src/main/java/br/com/exemplo/dataingestion/adapters/beans/KafkaConfig.java b/src/main/java/br/com/exemplo/dataingestion/adapters/beans/KafkaConfig.java
index de8230b..41b0ad5 100644
--- a/src/main/java/br/com/exemplo/dataingestion/adapters/beans/KafkaConfig.java
+++ b/src/main/java/br/com/exemplo/dataingestion/adapters/beans/KafkaConfig.java
@@ -1,27 +1,24 @@
package br.com.exemplo.dataingestion.adapters.beans;
-import br.com.exemplo.dataingestion.adapters.events.entities.DataLancamentoEvent;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
-import org.springframework.context.annotation.Scope;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.core.ProducerFactory;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
+import br.com.exemplo.dataingestion.adapters.events.entities.LoadEntity;
@Configuration
public class KafkaConfig {
+ @Value("${spring.kafka.listener.concurrency}")
+ private int concurrency;
+
+
@Bean
- @Scope(value = "prototype")
- public KafkaTemplate kafkaTemplate1(ProducerFactory producerFactory){
- Map map = new HashMap<>();
- map.put(ProducerConfig.CLIENT_ID_CONFIG,UUID.randomUUID().toString());
- return new KafkaTemplate(producerFactory,false,map);
+ public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(ConsumerFactory consumerFactory){
+ ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
+ factory.setConsumerFactory(consumerFactory);
+ factory.setConcurrency(concurrency);
+ return factory;
}
}
diff --git a/src/main/java/br/com/exemplo/dataingestion/adapters/beans/MicrometerConfig.java b/src/main/java/br/com/exemplo/dataingestion/adapters/beans/MicrometerConfig.java
index e6d9c98..7ce24d3 100644
--- a/src/main/java/br/com/exemplo/dataingestion/adapters/beans/MicrometerConfig.java
+++ b/src/main/java/br/com/exemplo/dataingestion/adapters/beans/MicrometerConfig.java
@@ -1,9 +1,10 @@
package br.com.exemplo.dataingestion.adapters.beans;
-import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
+
@Configuration
public class MicrometerConfig {
diff --git a/src/main/java/br/com/exemplo/dataingestion/adapters/controllers/servers/GenerateLoadController.java b/src/main/java/br/com/exemplo/dataingestion/adapters/controllers/servers/GenerateLoadController.java
deleted file mode 100644
index 395d37e..0000000
--- a/src/main/java/br/com/exemplo/dataingestion/adapters/controllers/servers/GenerateLoadController.java
+++ /dev/null
@@ -1,76 +0,0 @@
-package br.com.exemplo.dataingestion.adapters.controllers.servers;
-
-import br.com.exemplo.dataingestion.adapters.events.entities.DataLancamentoEvent;
-import br.com.exemplo.dataingestion.domain.entities.Lancamento;
-import br.com.exemplo.dataingestion.domain.producer.ProducerService;
-import br.com.exemplo.dataingestion.util.CreateLancamento;
-import io.micrometer.core.instrument.MeterRegistry;
-import io.micrometer.core.instrument.Timer;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.context.ApplicationContext;
-import org.springframework.http.ResponseEntity;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.stereotype.Component;
-import org.springframework.web.bind.annotation.GetMapping;
-import org.springframework.web.bind.annotation.PathVariable;
-import org.springframework.web.bind.annotation.RestController;
-
-import javax.annotation.PostConstruct;
-import java.applet.Applet;
-import java.lang.reflect.Array;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReferenceArray;
-
-@Component
-@RequiredArgsConstructor
-@Slf4j
-public class GenerateLoadController {
-
- private final CreateLancamento createLancamento;
- private final ApplicationContext applicationContext;
-
- private ExecutorService executorService;
- private final List producerServiceList;
- private final MeterRegistry simpleMeterRegistry;
-
- @Value("${processamento.threads.producao:10}")
- private int numeroThreadsProducao;
-
- @PostConstruct
- public void constroiProducer()
- {
- this.executorService = Executors.newFixedThreadPool(numeroThreadsProducao);
- log.debug("Inicializando produtores");
- for(int i=0;i {
- log.info("Inicializando thread {} com {} registros",Thread.currentThread().getId(),numeroItensThread.get());
- createLancamento.create(numeroItensThread.get(), qtdConta,qtdDias,producerService);
- log.info("Finalizando Thread thread {} ",Thread.currentThread().getId(),numeroItensThread.get());
- });
- }
- }
-}
diff --git a/src/main/java/br/com/exemplo/dataingestion/adapters/events/entities/ContaEvent.java b/src/main/java/br/com/exemplo/dataingestion/adapters/events/entities/ContaEvent.java
index bc058e8..23a632e 100644
--- a/src/main/java/br/com/exemplo/dataingestion/adapters/events/entities/ContaEvent.java
+++ b/src/main/java/br/com/exemplo/dataingestion/adapters/events/entities/ContaEvent.java
@@ -13,4 +13,4 @@
public class ContaEvent {
private UUID numeroUnicoConta;
private String codigoSufixoConta;
-}
+}
\ No newline at end of file
diff --git a/src/main/java/br/com/exemplo/dataingestion/adapters/events/entities/DataLancamentoEvent.java b/src/main/java/br/com/exemplo/dataingestion/adapters/events/entities/DataLancamentoEvent.java
index 41e750c..f742ada 100644
--- a/src/main/java/br/com/exemplo/dataingestion/adapters/events/entities/DataLancamentoEvent.java
+++ b/src/main/java/br/com/exemplo/dataingestion/adapters/events/entities/DataLancamentoEvent.java
@@ -1,7 +1,11 @@
package br.com.exemplo.dataingestion.adapters.events.entities;
-import br.com.exemplo.dataingestion.domain.entities.Lancamento;
-import lombok.*;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import lombok.ToString;
@Getter
@Setter
diff --git a/src/main/java/br/com/exemplo/dataingestion/adapters/events/entities/LancamentoEvent.java b/src/main/java/br/com/exemplo/dataingestion/adapters/events/entities/LancamentoEvent.java
index 174bf4c..96e8c97 100644
--- a/src/main/java/br/com/exemplo/dataingestion/adapters/events/entities/LancamentoEvent.java
+++ b/src/main/java/br/com/exemplo/dataingestion/adapters/events/entities/LancamentoEvent.java
@@ -1,12 +1,15 @@
package br.com.exemplo.dataingestion.adapters.events.entities;
-import br.com.exemplo.dataingestion.domain.entities.Conta;
-import lombok.*;
-
-import java.util.List;
import java.util.Map;
import java.util.UUID;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import lombok.ToString;
+
@Getter
@Setter
@NoArgsConstructor
diff --git a/src/main/java/br/com/exemplo/dataingestion/adapters/events/entities/LoadEntity.java b/src/main/java/br/com/exemplo/dataingestion/adapters/events/entities/LoadEntity.java
new file mode 100644
index 0000000..29e2900
--- /dev/null
+++ b/src/main/java/br/com/exemplo/dataingestion/adapters/events/entities/LoadEntity.java
@@ -0,0 +1,23 @@
+package br.com.exemplo.dataingestion.adapters.events.entities;
+
+import java.time.LocalDate;
+import java.util.UUID;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import lombok.ToString;
+
+@Getter
+@Setter
+@NoArgsConstructor
+@AllArgsConstructor
+@Builder
+@ToString
+public class LoadEntity {
+ private UUID idConta;
+ private int quantidadeDias;
+ private LocalDate dataFim;
+}
diff --git a/src/main/java/br/com/exemplo/dataingestion/adapters/events/listener/LoadListener.java b/src/main/java/br/com/exemplo/dataingestion/adapters/events/listener/LoadListener.java
new file mode 100644
index 0000000..17ff8d6
--- /dev/null
+++ b/src/main/java/br/com/exemplo/dataingestion/adapters/events/listener/LoadListener.java
@@ -0,0 +1,57 @@
+package br.com.exemplo.dataingestion.adapters.events.listener;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.kafka.annotation.KafkaHandler;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.stereotype.Component;
+
+import br.com.exemplo.dataingestion.adapters.events.entities.LoadEntity;
+import br.com.exemplo.dataingestion.domain.producer.ProducerService;
+import br.com.exemplo.dataingestion.util.CreateLancamento;
+import io.micrometer.core.instrument.Counter;
+import io.micrometer.core.instrument.Metrics;
+import io.micrometer.core.instrument.Timer;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@Component
+@KafkaListener(groupId = "${spring.kafka.consumer.group-id}",
+ topics = "${data.ingestion.consumer.topic}",
+ containerFactory = "kafkaListenerContainerFactory")
+public class LoadListener {
+
+ public LoadListener(CreateLancamento createLancamento, ProducerService producerService, @Value("${POD_NAME}") String podName) {
+ this.createLancamento = createLancamento;
+ this.producerService = producerService;
+ counter = Metrics.globalRegistry.counter("producer.account.items", "Type", podName);
+ timer = Metrics.globalRegistry.timer("producer.account.elapsed", "Type", podName);
+ }
+
+ private final CreateLancamento createLancamento;
+ private final ProducerService producerService;
+
+ private final Counter counter;
+ private final AtomicInteger accounts = new AtomicInteger(0);
+ private final Timer timer;
+
+ @KafkaHandler
+ public void geraEvento(LoadEntity loadEntity) {
+ log.debug("will generate {} records for account {} starting at {}", loadEntity.getQuantidadeDias(), loadEntity.getIdConta(), loadEntity.getDataFim());
+ Timer.Sample sample = Timer.start(Metrics.globalRegistry);
+ for (int i = 0; i < loadEntity.getQuantidadeDias(); i++) {
+ producerService.produce(createLancamento.createByContaAndData(loadEntity.getIdConta(), i, loadEntity.getDataFim()));
+ }
+
+ counter.increment();
+ sample.stop(
+ timer
+ );
+ log.info(
+ "Accounts so far: {}",
+ accounts.incrementAndGet()
+ );
+ }
+
+}
diff --git a/src/main/java/br/com/exemplo/dataingestion/adapters/events/mappers/LancamentoToEventMapperImpl.java b/src/main/java/br/com/exemplo/dataingestion/adapters/events/mappers/LancamentoToEventMapperImpl.java
index 91cde47..af35451 100644
--- a/src/main/java/br/com/exemplo/dataingestion/adapters/events/mappers/LancamentoToEventMapperImpl.java
+++ b/src/main/java/br/com/exemplo/dataingestion/adapters/events/mappers/LancamentoToEventMapperImpl.java
@@ -1,11 +1,11 @@
package br.com.exemplo.dataingestion.adapters.events.mappers;
+import org.springframework.stereotype.Component;
+
import br.com.exemplo.dataingestion.adapters.events.entities.ContaEvent;
import br.com.exemplo.dataingestion.adapters.events.entities.DataLancamentoEvent;
import br.com.exemplo.dataingestion.adapters.events.entities.LancamentoEvent;
-import br.com.exemplo.dataingestion.domain.entities.Conta;
import br.com.exemplo.dataingestion.domain.entities.Lancamento;
-import org.springframework.stereotype.Component;
@Component
public class LancamentoToEventMapperImpl implements EventMapper {
diff --git a/src/main/java/br/com/exemplo/dataingestion/adapters/events/producers/ProducerServiceImpl.java b/src/main/java/br/com/exemplo/dataingestion/adapters/events/producers/ProducerServiceImpl.java
index 078fe78..f62ad22 100644
--- a/src/main/java/br/com/exemplo/dataingestion/adapters/events/producers/ProducerServiceImpl.java
+++ b/src/main/java/br/com/exemplo/dataingestion/adapters/events/producers/ProducerServiceImpl.java
@@ -1,36 +1,64 @@
package br.com.exemplo.dataingestion.adapters.events.producers;
-import br.com.exemplo.dataingestion.adapters.events.entities.DataLancamentoEvent;
-import br.com.exemplo.dataingestion.adapters.events.mappers.EventMapper;
-import br.com.exemplo.dataingestion.domain.entities.Lancamento;
-import br.com.exemplo.dataingestion.domain.producer.ProducerService;
-import io.micrometer.core.instrument.MeterRegistry;
-import io.micrometer.core.instrument.Timer;
-import lombok.RequiredArgsConstructor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.annotation.PostConstruct;
+
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Scope;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
-@RequiredArgsConstructor
+import br.com.exemplo.dataingestion.adapters.events.entities.DataLancamentoEvent;
+import br.com.exemplo.dataingestion.adapters.events.mappers.EventMapper;
+import br.com.exemplo.dataingestion.domain.entities.Lancamento;
+import br.com.exemplo.dataingestion.domain.producer.ProducerService;
+import io.micrometer.core.instrument.Counter;
+import io.micrometer.core.instrument.Metrics;
+import io.micrometer.core.instrument.Timer;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
@Component
@Scope(value = "prototype")
public class ProducerServiceImpl implements ProducerService {
+ @Value("${POD_NAME}")
+ String podName;
+
@Value("${data.ingestion.producer.topic}")
private String producerTopic;
- private final KafkaTemplate kafkaTemplate;
- private final EventMapper lancamentoDataLancamentoEventEventMapper;
- private final MeterRegistry simpleMeterRegistry;
+ @Autowired
+ private KafkaTemplate kafkaTemplate;
+
+ @Autowired
+ private EventMapper lancamentoDataLancamentoEventEventMapper;
+
+ private Counter counter;
+ private final AtomicInteger records = new AtomicInteger(0);
+ private Timer timer;
+
+ @PostConstruct
+ private void init() {
+ counter = Metrics.globalRegistry.counter("producer.items", "Type", podName);
+ timer = Metrics.globalRegistry.timer("producer.elapsed", "Type", podName);
+ }
+
@Override
public Lancamento produce(Lancamento lancamento) {
- simpleMeterRegistry.counter("kafka.contador","type","producao","thread",String.valueOf(Thread.currentThread().getId())).increment();
- Timer.Sample sample = Timer.start(simpleMeterRegistry);
- ProducerRecord producerRecord = new ProducerRecord(producerTopic, lancamentoDataLancamentoEventEventMapper.convert(lancamento));
- sample.stop(simpleMeterRegistry.timer("kafka.time","type","producao","thread",String.valueOf(Thread.currentThread().getId())));
- kafkaTemplate.send(producerRecord);
+ Timer.Sample sample = Timer.start(Metrics.globalRegistry);
+ kafkaTemplate.send(
+ new ProducerRecord (producerTopic, lancamentoDataLancamentoEventEventMapper.convert(lancamento))
+ );
+ counter.increment();
+ sample.stop(timer);
+ log.debug(
+ "Records so far: {}",
+ records.incrementAndGet()
+ );
return lancamento;
}
}
diff --git a/src/main/java/br/com/exemplo/dataingestion/domain/entities/Lancamento.java b/src/main/java/br/com/exemplo/dataingestion/domain/entities/Lancamento.java
index 8d8e6f3..357bc1a 100644
--- a/src/main/java/br/com/exemplo/dataingestion/domain/entities/Lancamento.java
+++ b/src/main/java/br/com/exemplo/dataingestion/domain/entities/Lancamento.java
@@ -1,11 +1,15 @@
package br.com.exemplo.dataingestion.domain.entities;
-import lombok.*;
-
-import java.util.List;
import java.util.Map;
import java.util.UUID;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import lombok.ToString;
+
@Getter
@Setter
@NoArgsConstructor
diff --git a/src/main/java/br/com/exemplo/dataingestion/domain/producer/ProducerService.java b/src/main/java/br/com/exemplo/dataingestion/domain/producer/ProducerService.java
index c2df14b..f3dce7b 100644
--- a/src/main/java/br/com/exemplo/dataingestion/domain/producer/ProducerService.java
+++ b/src/main/java/br/com/exemplo/dataingestion/domain/producer/ProducerService.java
@@ -1,6 +1,5 @@
package br.com.exemplo.dataingestion.domain.producer;
-import br.com.exemplo.dataingestion.adapters.events.entities.DataLancamentoEvent;
import br.com.exemplo.dataingestion.domain.entities.Lancamento;
public interface ProducerService {
diff --git a/src/main/java/br/com/exemplo/dataingestion/util/CreateLancamento.java b/src/main/java/br/com/exemplo/dataingestion/util/CreateLancamento.java
index 8a41649..0e9fbda 100644
--- a/src/main/java/br/com/exemplo/dataingestion/util/CreateLancamento.java
+++ b/src/main/java/br/com/exemplo/dataingestion/util/CreateLancamento.java
@@ -1,28 +1,30 @@
package br.com.exemplo.dataingestion.util;
-import br.com.exemplo.dataingestion.adapters.events.entities.ContaEvent;
-import br.com.exemplo.dataingestion.adapters.events.entities.LancamentoEvent;
-import br.com.exemplo.dataingestion.adapters.events.entities.Location;
-import br.com.exemplo.dataingestion.domain.entities.Conta;
-import br.com.exemplo.dataingestion.domain.entities.Lancamento;
-import br.com.exemplo.dataingestion.domain.producer.ProducerService;
-import com.github.javafaker.Faker;
-import lombok.RequiredArgsConstructor;
-import lombok.SneakyThrows;
-import org.apache.commons.lang3.StringUtils;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Component;
-
-import javax.annotation.PostConstruct;
import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalTime;
import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
-import java.time.temporal.TemporalUnit;
-import java.util.*;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+
+import javax.annotation.PostConstruct;
+
+import com.github.javafaker.Faker;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import br.com.exemplo.dataingestion.adapters.events.entities.Location;
+import br.com.exemplo.dataingestion.domain.entities.Conta;
+import br.com.exemplo.dataingestion.domain.entities.Lancamento;
+import lombok.RequiredArgsConstructor;
@Component
@RequiredArgsConstructor
@@ -30,7 +32,6 @@ public class CreateLancamento {
private final Faker faker;
- private ExecutorService executorService;
private List lista = new ArrayList<>();
private Random random = new Random();
@Value("${processamento.threads.geracao.massa:10}")
@@ -79,25 +80,36 @@ public Lancamento create123()
.valorLancamento(BigDecimal.valueOf(1000.00).toString())
.build();
}
- public Lancamento createWithParameter(UUID numeroConta, Random random, int dias)
- {
+
+
+ public Lancamento createByContaAndData(UUID idConta, int dias, LocalDate dataFim) {
+ String data = OffsetDateTime.of(
+ dataFim,
+ LocalTime.MIDNIGHT,
+ ZoneOffset.ofHours(-3)
+ ).minus(
+ dias,
+ ChronoUnit.DAYS
+ ).format(
+ DateTimeFormatter.ISO_DATE_TIME
+ );
Map map = new HashMap<>();
map.put("nome",faker.name().fullName());
map.put("estabelecimento",faker.company().name());
map.put("location", Location.builder().lat(faker.address().latitude()).lon(faker.address().longitude()).build());
map.put("categoriaEstabelecimento",faker.commerce().department());
- return Lancamento.builder()
+ Lancamento lancamento = Lancamento.builder()
.codigoMoedaTransacao("986")
.codigoMotivoLancamento(String.valueOf(random.nextInt(999999)))
.codigoTipoOperacao("TEF_CC_CC")
.conta(
Conta.builder()
.codigoSufixoConta("100")
- .numeroUnicoConta(numeroConta)
+ .numeroUnicoConta(idConta)
.build()
)
- .dataContabilLancamento(OffsetDateTime.now().minus(random.nextInt(dias), ChronoUnit.DAYS ).format(DateTimeFormatter.ISO_DATE_TIME))
- .dataLancamento(OffsetDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME))
+ .dataContabilLancamento(data)
+ .dataLancamento(data)
.indicadorLancamentoCompulsorioOcorrencia(random.nextBoolean())
.metadados(map)
.numeroIdentificacaoLancamentoConta(UUID.randomUUID())
@@ -105,18 +117,6 @@ public Lancamento createWithParameter(UUID numeroConta, Random random, int dias)
.textoComplementoLancamento(faker.commerce().productName()+" "+lista.get(random.nextInt(lista.size()-1)))
.valorLancamento(BigDecimal.valueOf(random.nextDouble()).toString())
.build();
+ return lancamento;
}
- @SneakyThrows
- public void create(int quantidadeRegistros, int quantidadeContas,int quantidadeDias,ProducerService producerService)
- {
- for(int i=0;i set = new HashSet<>();
FileOutputStream outputStream = new FileOutputStream("MyFile.txt");
OutputStreamWriter outputStreamWriter = new OutputStreamWriter(outputStream, "UTF-16");
BufferedWriter bufferedWriter = new BufferedWriter(outputStreamWriter);