From fdb0229ee7f34397f8ca2963b590f983ff91bc35 Mon Sep 17 00:00:00 2001 From: edsena Date: Mon, 28 Sep 2020 02:36:56 -0300 Subject: [PATCH 1/6] running in batch/job mode --- README.md | 33 +++++++-------- docker-compose.yaml | 42 +++++++++++++++++++ pom.xml | 5 +-- .../LoadGenerateApplication.java | 12 ++---- .../servers/GenerateLoadController.java | 21 +++------- 5 files changed, 66 insertions(+), 47 deletions(-) create mode 100644 docker-compose.yaml diff --git a/README.md b/README.md index a4588e3..bba2349 100644 --- a/README.md +++ b/README.md @@ -1,26 +1,21 @@ # data-ingestion -bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 10 --topic ingestion +## spinning up the infrastructure +docker compose -p statement up -bin/kafka-console-consumer.sh --topic ingestion --from-beginning --bootstrap-server localhost:9092 +## view container's IP address +docker ps -q | xargs -n 1 docker inspect --format '{{ .Name }} {{range .NetworkSettings.Networks}} {{.IPAddress}}{{end}}' | sed 's#^/##'; -docker run --rm -e ADV_HOST=localhost -p 3030:3030 -p 8082:8082 -p 9092:9092 -p 2181:2181 -p 8081:8081 -e RUNNING_SAMPLEDATA=0 -e RUNTESTS=0 -e CONNECT_PORT=0 lensesio/fast-data-dev +## create topics +docker exec -it statement_kafka_1 kafka-topics --bootstrap-server localhost:9092 --create --replication-factor 1 --partitions 10 --topic ingestion -docker run -d --name es762 -p 9200:9200 -e "discovery.type=single-node" elasticsearch:7.6.2 +## consume messages +docker exec -it statement_kafka_1 kafka-console-consumer --bootstrap-server localhost:9092 --topic ingestion --from-beginning -curl -X POST "http://localhost:9200/extrato/_search" -d'{"query":{"query_string":{"query": "8bfca9fa-ecdf-4f18-bfe3-e7f3c0360a40"}}}' -H 'Content-type:application/json' +## list consumer groups +docker exec -it statement_kafka_1 kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group data-ingestion - -curl -X POST "http://localhost:9200/extrato/_search" -d'{"query":{"query_string":{"query": "946eccbc-a87b-4aa5-b856-a13ca34ced98"}}}' -H 'Content-type:application/json' - -docker run -d --name grafana -p 3000:3000 grafana/grafana - -docker run -d --name prometheus -p 9090:9090 -v /home/capiau3/Desktop/Projetos/prometheus.yml:/etc/prometheus/prometheus.yml prom/prometheus --config.file=/etc/prometheus/prometheus.yml - -ifconfig -a - -curl -X POST "http://localhost:9200/extrato/_search?pretty" - -bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group data-ingestion - -curl localhost:8080/geraevento/10000/1000000 \ No newline at end of file +## search for data +curl -X POST "http://240.12.0.2:9200/extrato/_search" -d'{"query":{"query_string":{"query": "8bfca9fa-ecdf-4f18-bfe3-e7f3c0360a40"}}}' -H 'Content-type:application/json' +curl -X POST "http://240.12.0.2:9200/extrato/_search" -d'{"query":{"query_string":{"query": "946eccbc-a87b-4aa5-b856-a13ca34ced98"}}}' -H 'Content-type:application/json' +curl -X POST "http://240.12.0.2:9200/extrato/_search?pretty" \ No newline at end of file diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 0000000..de568b3 --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,42 @@ +version: "3.8" +services: + kafka: + image: lensesio/fast-data-dev + restart: unless-stopped + environment: + - ADV_HOST=240.12.0.2 + - RUNNING_SAMPLEDATA=0 + - RUNTESTS=0 + - CONNECT_PORT=0 + networks: + kafka: + ipv4_address: 240.12.0.2 + + elasticsearch: + image: elasticsearch:7.7.1 + network_mode: "service:kafka" + restart: unless-stopped + environment: + - discovery.type=single-node + + grafana: + image: grafana/grafana + network_mode: "service:kafka" + restart: unless-stopped + + prometheus: + image: prom/prometheus + network_mode: "service:kafka" + restart: unless-stopped + command: --config.file=/etc/prometheus/prometheus.yml + volumes: + - ${PWD}/prometheus.yml:/etc/prometheus/prometheus.yml + +networks: + public: + external: true + kafka: + driver: bridge + ipam: + config: + - subnet: 240.12.0.0/24 \ No newline at end of file diff --git a/pom.xml b/pom.xml index be6ec43..5ca9711 100644 --- a/pom.xml +++ b/pom.xml @@ -23,10 +23,7 @@ org.springframework.boot spring-boot-starter-actuator - - org.springframework.boot - spring-boot-starter-web - + org.springframework.kafka spring-kafka diff --git a/src/main/java/br/com/exemplo/dataingestion/LoadGenerateApplication.java b/src/main/java/br/com/exemplo/dataingestion/LoadGenerateApplication.java index 9ca4741..e5af372 100644 --- a/src/main/java/br/com/exemplo/dataingestion/LoadGenerateApplication.java +++ b/src/main/java/br/com/exemplo/dataingestion/LoadGenerateApplication.java @@ -7,15 +7,10 @@ import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.context.event.ApplicationReadyEvent; -import org.springframework.context.event.EventListener; - -import javax.annotation.PostConstruct; - @SpringBootApplication @RequiredArgsConstructor @Slf4j -public class LoadGenerateApplication { +public class LoadGenerateApplication implements CommandLineRunner { @Value("${registros.total:1000000}") private int registro; @@ -32,9 +27,8 @@ public static void main(String[] args) { SpringApplication.run(LoadGenerateApplication.class, args); } - - @EventListener(ApplicationReadyEvent.class) - public void run() throws Exception { + @Override + public void run(String... args) throws Exception { log.info("Iniciando a produção de {} com {} contas e com {} dias retroativos",registro,contas,dias); generateLoadController.geraEvento(contas,registro,dias); log.info("Liberando comando da aplicação"); diff --git a/src/main/java/br/com/exemplo/dataingestion/adapters/controllers/servers/GenerateLoadController.java b/src/main/java/br/com/exemplo/dataingestion/adapters/controllers/servers/GenerateLoadController.java index 395d37e..ace2a41 100644 --- a/src/main/java/br/com/exemplo/dataingestion/adapters/controllers/servers/GenerateLoadController.java +++ b/src/main/java/br/com/exemplo/dataingestion/adapters/controllers/servers/GenerateLoadController.java @@ -1,33 +1,21 @@ package br.com.exemplo.dataingestion.adapters.controllers.servers; -import br.com.exemplo.dataingestion.adapters.events.entities.DataLancamentoEvent; -import br.com.exemplo.dataingestion.domain.entities.Lancamento; import br.com.exemplo.dataingestion.domain.producer.ProducerService; import br.com.exemplo.dataingestion.util.CreateLancamento; import io.micrometer.core.instrument.MeterRegistry; -import io.micrometer.core.instrument.Timer; import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationContext; -import org.springframework.http.ResponseEntity; -import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.PathVariable; -import org.springframework.web.bind.annotation.RestController; import javax.annotation.PostConstruct; -import java.applet.Applet; -import java.lang.reflect.Array; -import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReferenceArray; @Component @RequiredArgsConstructor @@ -55,7 +43,8 @@ public void constroiProducer() } } - public void geraEvento(@PathVariable("qtdConta") int qtdConta,@PathVariable("qtdReg") int qtdReg,@PathVariable("qtdDias") int qtdDias) + @SneakyThrows + public void geraEvento(int qtdConta, int qtdReg, int qtdDias) { AtomicInteger numeroItensThread = new AtomicInteger(qtdReg/numeroThreadsProducao); @@ -72,5 +61,7 @@ public void geraEvento(@PathVariable("qtdConta") int qtdConta,@PathVariable("qtd log.info("Finalizando Thread thread {} ",Thread.currentThread().getId(),numeroItensThread.get()); }); } + executorService.shutdown(); + executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); } } From 05dd65eb4cd242109fd2e3bed6bfbbb2d9b51dea Mon Sep 17 00:00:00 2001 From: Bruno Boassi Date: Tue, 29 Sep 2020 14:45:19 -0300 Subject: [PATCH 2/6] =?UTF-8?q?Consumo=20e=20gera=C3=A7=C3=A3o?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../LoadGenerateApplication.java | 18 -------- .../adapters/beans/KafkaConfig.java | 15 ++++--- .../adapters/events/entities/LoadEntity.java | 16 +++++++ .../events/listener/LoadListener.java | 45 +++++++++++++++++++ .../dataingestion/util/CreateLancamento.java | 25 +++-------- src/main/resources/application.properties | 23 +++++++--- 6 files changed, 94 insertions(+), 48 deletions(-) create mode 100644 src/main/java/br/com/exemplo/dataingestion/adapters/events/entities/LoadEntity.java create mode 100644 src/main/java/br/com/exemplo/dataingestion/adapters/events/listener/LoadListener.java diff --git a/src/main/java/br/com/exemplo/dataingestion/LoadGenerateApplication.java b/src/main/java/br/com/exemplo/dataingestion/LoadGenerateApplication.java index 9ca4741..f964b9b 100644 --- a/src/main/java/br/com/exemplo/dataingestion/LoadGenerateApplication.java +++ b/src/main/java/br/com/exemplo/dataingestion/LoadGenerateApplication.java @@ -17,26 +17,8 @@ @Slf4j public class LoadGenerateApplication { - @Value("${registros.total:1000000}") - private int registro; - - @Value("${contas.total:1000}") - private int contas; - - @Value("${dias.total:90}") - private int dias; - - private final GenerateLoadController generateLoadController; public static void main(String[] args) { SpringApplication.run(LoadGenerateApplication.class, args); } - - - @EventListener(ApplicationReadyEvent.class) - public void run() throws Exception { - log.info("Iniciando a produção de {} com {} contas e com {} dias retroativos",registro,contas,dias); - generateLoadController.geraEvento(contas,registro,dias); - log.info("Liberando comando da aplicação"); - } } diff --git a/src/main/java/br/com/exemplo/dataingestion/adapters/beans/KafkaConfig.java b/src/main/java/br/com/exemplo/dataingestion/adapters/beans/KafkaConfig.java index de8230b..7cb0d26 100644 --- a/src/main/java/br/com/exemplo/dataingestion/adapters/beans/KafkaConfig.java +++ b/src/main/java/br/com/exemplo/dataingestion/adapters/beans/KafkaConfig.java @@ -3,6 +3,7 @@ import br.com.exemplo.dataingestion.adapters.events.entities.DataLancamentoEvent; import org.apache.kafka.clients.producer.ProducerConfig; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Scope; @@ -17,11 +18,15 @@ @Configuration public class KafkaConfig { + @Value("${spring.kafka.listener.concurrency}") + private int concurrency; + + @Bean - @Scope(value = "prototype") - public KafkaTemplate kafkaTemplate1(ProducerFactory producerFactory){ - Map map = new HashMap<>(); - map.put(ProducerConfig.CLIENT_ID_CONFIG,UUID.randomUUID().toString()); - return new KafkaTemplate(producerFactory,false,map); + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(ConsumerFactory consumerFactory){ + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); + factory.setConsumerFactory(consumerFactory); + factory.setConcurrency(concurrency); + return factory; } } diff --git a/src/main/java/br/com/exemplo/dataingestion/adapters/events/entities/LoadEntity.java b/src/main/java/br/com/exemplo/dataingestion/adapters/events/entities/LoadEntity.java new file mode 100644 index 0000000..563e906 --- /dev/null +++ b/src/main/java/br/com/exemplo/dataingestion/adapters/events/entities/LoadEntity.java @@ -0,0 +1,16 @@ +package br.com.exemplo.dataingestion.adapters.events.entities; + +import lombok.*; + +import java.util.UUID; + +@Getter +@Setter +@NoArgsConstructor +@AllArgsConstructor +@Builder +@ToString +public class LoadEntity { + private UUID idConta; + private int quantidadeDias; +} diff --git a/src/main/java/br/com/exemplo/dataingestion/adapters/events/listener/LoadListener.java b/src/main/java/br/com/exemplo/dataingestion/adapters/events/listener/LoadListener.java new file mode 100644 index 0000000..cc4dd52 --- /dev/null +++ b/src/main/java/br/com/exemplo/dataingestion/adapters/events/listener/LoadListener.java @@ -0,0 +1,45 @@ +package br.com.exemplo.dataingestion.adapters.events.listener; + +import br.com.exemplo.dataingestion.adapters.events.entities.LoadEntity; +import br.com.exemplo.dataingestion.domain.producer.ProducerService; +import br.com.exemplo.dataingestion.util.CreateLancamento; +import io.micrometer.core.instrument.MeterRegistry; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.ApplicationContext; +import org.springframework.kafka.annotation.KafkaHandler; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.stereotype.Component; +import org.springframework.web.bind.annotation.PathVariable; + +import javax.annotation.PostConstruct; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; + +@Component +@RequiredArgsConstructor +@Slf4j + +public class LoadListener { + + private final CreateLancamento createLancamento; + private final ApplicationContext applicationContext; + + private final MeterRegistry simpleMeterRegistry; + private ProducerService producerService; + + + @KafkaHandler + public void geraEvento(LoadEntity loadEntity, Acknowledgment acknowledgment) + { + for (int i=0;i map = new HashMap<>(); map.put("nome",faker.name().fullName()); map.put("estabelecimento",faker.company().name()); @@ -93,11 +95,11 @@ public Lancamento createWithParameter(UUID numeroConta, Random random, int dias) .conta( Conta.builder() .codigoSufixoConta("100") - .numeroUnicoConta(numeroConta) + .numeroUnicoConta(idConta) .build() ) - .dataContabilLancamento(OffsetDateTime.now().minus(random.nextInt(dias), ChronoUnit.DAYS ).format(DateTimeFormatter.ISO_DATE_TIME)) - .dataLancamento(OffsetDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME)) + .dataContabilLancamento(data) + .dataLancamento(data) .indicadorLancamentoCompulsorioOcorrencia(random.nextBoolean()) .metadados(map) .numeroIdentificacaoLancamentoConta(UUID.randomUUID()) @@ -106,17 +108,4 @@ public Lancamento createWithParameter(UUID numeroConta, Random random, int dias) .valorLancamento(BigDecimal.valueOf(random.nextDouble()).toString()) .build(); } - @SneakyThrows - public void create(int quantidadeRegistros, int quantidadeContas,int quantidadeDias,ProducerService producerService) - { - for(int i=0;i Date: Tue, 29 Sep 2020 18:45:49 -0300 Subject: [PATCH 3/6] =?UTF-8?q?Vers=C3=A3o=20final=20de=20teste?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 34 ++++----- docker-compose.yaml | 42 ++++++++++ .../LoadGenerateApplication.java | 7 -- .../adapters/beans/KafkaConfig.java | 3 +- .../servers/GenerateLoadController.java | 76 ------------------- .../events/listener/LoadListener.java | 21 +++-- src/main/resources/application.properties | 16 ++-- 7 files changed, 80 insertions(+), 119 deletions(-) create mode 100644 docker-compose.yaml delete mode 100644 src/main/java/br/com/exemplo/dataingestion/adapters/controllers/servers/GenerateLoadController.java diff --git a/README.md b/README.md index a4588e3..500222b 100644 --- a/README.md +++ b/README.md @@ -1,26 +1,22 @@ # data-ingestion -bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 10 --topic ingestion +## spinning up the infrastructure +docker-compose -p statement up -bin/kafka-console-consumer.sh --topic ingestion --from-beginning --bootstrap-server localhost:9092 +## view container's IP address +docker ps -q | xargs -n 1 docker inspect --format '{{ .Name }} {{range .NetworkSettings.Networks}} {{.IPAddress}}{{end}}' | sed 's#^/##'; -docker run --rm -e ADV_HOST=localhost -p 3030:3030 -p 8082:8082 -p 9092:9092 -p 2181:2181 -p 8081:8081 -e RUNNING_SAMPLEDATA=0 -e RUNTESTS=0 -e CONNECT_PORT=0 lensesio/fast-data-dev +## create topics +docker exec -it statement_kafka_1 kafka-topics --bootstrap-server localhost:9092 --create --replication-factor 1 --partitions 10 --topic load +docker exec -it statement_kafka_1 kafka-topics --bootstrap-server localhost:9092 --create --replication-factor 1 --partitions 10 --topic ingestion -docker run -d --name es762 -p 9200:9200 -e "discovery.type=single-node" elasticsearch:7.6.2 +## consume messages +docker exec -it statement_kafka_1 kafka-console-consumer --bootstrap-server localhost:9092 --topic ingestion --from-beginning -curl -X POST "http://localhost:9200/extrato/_search" -d'{"query":{"query_string":{"query": "8bfca9fa-ecdf-4f18-bfe3-e7f3c0360a40"}}}' -H 'Content-type:application/json' +## list consumer groups +docker exec -it statement_kafka_1 kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group data-ingestion - -curl -X POST "http://localhost:9200/extrato/_search" -d'{"query":{"query_string":{"query": "946eccbc-a87b-4aa5-b856-a13ca34ced98"}}}' -H 'Content-type:application/json' - -docker run -d --name grafana -p 3000:3000 grafana/grafana - -docker run -d --name prometheus -p 9090:9090 -v /home/capiau3/Desktop/Projetos/prometheus.yml:/etc/prometheus/prometheus.yml prom/prometheus --config.file=/etc/prometheus/prometheus.yml - -ifconfig -a - -curl -X POST "http://localhost:9200/extrato/_search?pretty" - -bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group data-ingestion - -curl localhost:8080/geraevento/10000/1000000 \ No newline at end of file +## search for data +curl -X POST "http://240.12.0.2:9200/extrato/_search" -d'{"query":{"query_string":{"query": "8bfca9fa-ecdf-4f18-bfe3-e7f3c0360a40"}}}' -H 'Content-type:application/json' +curl -X POST "http://240.12.0.2:9200/extrato/_search" -d'{"query":{"query_string":{"query": "946eccbc-a87b-4aa5-b856-a13ca34ced98"}}}' -H 'Content-type:application/json' +curl -X POST "http://240.12.0.2:9200/extrato/_search?pretty" \ No newline at end of file diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 0000000..044e92b --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,42 @@ +version: "3.3" +services: + kafka: + image: lensesio/fast-data-dev + restart: unless-stopped + environment: + - ADV_HOST=240.12.0.2 + - RUNNING_SAMPLEDATA=0 + - RUNTESTS=0 + - CONNECT_PORT=0 + networks: + kafka: + ipv4_address: 240.12.0.2 + + elasticsearch: + image: elasticsearch:7.7.1 + network_mode: "service:kafka" + restart: unless-stopped + environment: + - discovery.type=single-node + + grafana: + image: grafana/grafana + network_mode: "service:kafka" + restart: unless-stopped + + prometheus: + image: prom/prometheus + network_mode: "service:kafka" + restart: unless-stopped + command: --config.file=/etc/prometheus/prometheus.yml + volumes: + - ${PWD}/prometheus.yml:/etc/prometheus/prometheus.yml + +networks: + public: + external: true + kafka: + driver: bridge + ipam: + config: + - subnet: 240.12.0.0/24 \ No newline at end of file diff --git a/src/main/java/br/com/exemplo/dataingestion/LoadGenerateApplication.java b/src/main/java/br/com/exemplo/dataingestion/LoadGenerateApplication.java index f964b9b..d54615f 100644 --- a/src/main/java/br/com/exemplo/dataingestion/LoadGenerateApplication.java +++ b/src/main/java/br/com/exemplo/dataingestion/LoadGenerateApplication.java @@ -1,16 +1,9 @@ package br.com.exemplo.dataingestion; -import br.com.exemplo.dataingestion.adapters.controllers.servers.GenerateLoadController; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.context.event.ApplicationReadyEvent; -import org.springframework.context.event.EventListener; - -import javax.annotation.PostConstruct; @SpringBootApplication @RequiredArgsConstructor diff --git a/src/main/java/br/com/exemplo/dataingestion/adapters/beans/KafkaConfig.java b/src/main/java/br/com/exemplo/dataingestion/adapters/beans/KafkaConfig.java index 7cb0d26..f334bab 100644 --- a/src/main/java/br/com/exemplo/dataingestion/adapters/beans/KafkaConfig.java +++ b/src/main/java/br/com/exemplo/dataingestion/adapters/beans/KafkaConfig.java @@ -1,6 +1,7 @@ package br.com.exemplo.dataingestion.adapters.beans; import br.com.exemplo.dataingestion.adapters.events.entities.DataLancamentoEvent; +import br.com.exemplo.dataingestion.adapters.events.entities.LoadEntity; import org.apache.kafka.clients.producer.ProducerConfig; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; @@ -23,7 +24,7 @@ public class KafkaConfig { @Bean - public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(ConsumerFactory consumerFactory){ + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(ConsumerFactory consumerFactory){ ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory); factory.setConcurrency(concurrency); diff --git a/src/main/java/br/com/exemplo/dataingestion/adapters/controllers/servers/GenerateLoadController.java b/src/main/java/br/com/exemplo/dataingestion/adapters/controllers/servers/GenerateLoadController.java deleted file mode 100644 index 395d37e..0000000 --- a/src/main/java/br/com/exemplo/dataingestion/adapters/controllers/servers/GenerateLoadController.java +++ /dev/null @@ -1,76 +0,0 @@ -package br.com.exemplo.dataingestion.adapters.controllers.servers; - -import br.com.exemplo.dataingestion.adapters.events.entities.DataLancamentoEvent; -import br.com.exemplo.dataingestion.domain.entities.Lancamento; -import br.com.exemplo.dataingestion.domain.producer.ProducerService; -import br.com.exemplo.dataingestion.util.CreateLancamento; -import io.micrometer.core.instrument.MeterRegistry; -import io.micrometer.core.instrument.Timer; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.ApplicationContext; -import org.springframework.http.ResponseEntity; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.stereotype.Component; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.PathVariable; -import org.springframework.web.bind.annotation.RestController; - -import javax.annotation.PostConstruct; -import java.applet.Applet; -import java.lang.reflect.Array; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReferenceArray; - -@Component -@RequiredArgsConstructor -@Slf4j -public class GenerateLoadController { - - private final CreateLancamento createLancamento; - private final ApplicationContext applicationContext; - - private ExecutorService executorService; - private final List producerServiceList; - private final MeterRegistry simpleMeterRegistry; - - @Value("${processamento.threads.producao:10}") - private int numeroThreadsProducao; - - @PostConstruct - public void constroiProducer() - { - this.executorService = Executors.newFixedThreadPool(numeroThreadsProducao); - log.debug("Inicializando produtores"); - for(int i=0;i { - log.info("Inicializando thread {} com {} registros",Thread.currentThread().getId(),numeroItensThread.get()); - createLancamento.create(numeroItensThread.get(), qtdConta,qtdDias,producerService); - log.info("Finalizando Thread thread {} ",Thread.currentThread().getId(),numeroItensThread.get()); - }); - } - } -} diff --git a/src/main/java/br/com/exemplo/dataingestion/adapters/events/listener/LoadListener.java b/src/main/java/br/com/exemplo/dataingestion/adapters/events/listener/LoadListener.java index cc4dd52..e47212d 100644 --- a/src/main/java/br/com/exemplo/dataingestion/adapters/events/listener/LoadListener.java +++ b/src/main/java/br/com/exemplo/dataingestion/adapters/events/listener/LoadListener.java @@ -6,15 +6,21 @@ import io.micrometer.core.instrument.MeterRegistry; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationContext; import org.springframework.kafka.annotation.KafkaHandler; +import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; +import org.springframework.messaging.handler.annotation.MessageExceptionHandler; +import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestBody; import javax.annotation.PostConstruct; import java.util.List; +import java.util.Objects; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; @@ -22,7 +28,9 @@ @Component @RequiredArgsConstructor @Slf4j - +@KafkaListener(groupId = "${spring.kafka.consumer.group-id}", + topics = "${data.ingestion.consumer.topic}", + containerFactory = "kafkaListenerContainerFactory") public class LoadListener { private final CreateLancamento createLancamento; @@ -33,13 +41,10 @@ public class LoadListener { @KafkaHandler - public void geraEvento(LoadEntity loadEntity, Acknowledgment acknowledgment) - { - for (int i=0;i Date: Tue, 29 Sep 2020 18:46:36 -0300 Subject: [PATCH 4/6] =?UTF-8?q?Vers=C3=A3o=20final=20de=20teste?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/resources/application.properties | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 0f8219f..596a274 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -2,9 +2,9 @@ server.port=8080 spring.kafka.producer.bootstrap-servers=${KAFKA_BOOTSTRAP} spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer -#spring.kafka.producer.ssl.trust-store-location=file:kafka.client.truststore.jks -#spring.kafka.producer.ssl.protocol=SSL -#spring.kafka.properties.security.protocol=SSL +spring.kafka.producer.ssl.trust-store-location=file:kafka.client.truststore.jks +spring.kafka.producer.ssl.protocol=SSL +spring.kafka.properties.security.protocol=SSL data.ingestion.producer.topic=${KAFKA_PRODUCER_TOPIC} management.endpoints.web.exposure.include=info,health,prometheus,metrics @@ -20,8 +20,8 @@ spring.kafka.consumer.properties.spring.json.trusted.packages=* spring.kafka.listener.ack-mode=BATCH spring.kafka.listener.concurrency=${KAFKA_CONCURRENCY:10} data.ingestion.consumer.topic=${KAFKA_CONSUMER_TOPIC} -#spring.kafka.consumer.ssl.protocol=SSL -#spring.kafka.consumer.ssl.truststore-location=file:kafka.client.truststore.jks +spring.kafka.consumer.ssl.protocol=SSL +spring.kafka.consumer.ssl.truststore-location=file:kafka.client.truststore.jks logging.dir=logs logging.filename=application.log \ No newline at end of file From 08e15c48ecc5c127efae4c13285de8af90210810 Mon Sep 17 00:00:00 2001 From: edsena Date: Sun, 1 Nov 2020 01:05:21 -0300 Subject: [PATCH 5/6] ajustes para o teste --- Dockerfile | 6 +- README.md | 22 ----- docker-compose.yaml | 42 --------- load-generate.yaml | 61 +++++++++++++ pom.xml | 89 +++++++++++-------- prometheus.yml | 26 ------ .../LoadGenerateApplication.java | 7 +- .../adapters/beans/KafkaConfig.java | 13 +-- .../adapters/beans/MicrometerConfig.java | 3 +- .../events/entities/DataLancamentoEvent.java | 8 +- .../events/entities/LancamentoEvent.java | 11 ++- .../events/listener/LoadListener.java | 59 ++++++------ .../mappers/LancamentoToEventMapperImpl.java | 4 +- .../events/producers/ProducerServiceImpl.java | 60 +++++++++---- .../domain/entities/Lancamento.java | 10 ++- .../domain/producer/ProducerService.java | 1 - .../dataingestion/util/CreateLancamento.java | 41 +++++---- src/main/resources/application-aws.properties | 41 +++++++++ .../resources/application-local.properties | 39 ++++++++ .../resources/application-remote.properties | 44 +++++++++ src/main/resources/application.properties | 27 ------ .../util/CreateLancamentoTest.java | 19 ++-- 22 files changed, 371 insertions(+), 262 deletions(-) delete mode 100644 README.md delete mode 100644 docker-compose.yaml create mode 100644 load-generate.yaml delete mode 100644 prometheus.yml create mode 100644 src/main/resources/application-aws.properties create mode 100644 src/main/resources/application-local.properties create mode 100644 src/main/resources/application-remote.properties delete mode 100644 src/main/resources/application.properties diff --git a/Dockerfile b/Dockerfile index 76cecc5..110287e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,7 +1,5 @@ -FROM adoptopenjdk/openjdk11:jdk-11.0.2.9-slim +FROM adoptopenjdk/openjdk11:jre-11.0.8_10-alpine WORKDIR /opt -ENV PORT 8080 -EXPOSE 8080 COPY kafka.client.truststore.jks /opt/kafka.client.truststore.jks COPY target/*.jar /opt/app.jar -ENTRYPOINT exec java $JAVA_OPTS -jar app.jar \ No newline at end of file +ENTRYPOINT exec java $JAVA_OPTS -jar app.jar --spring.profiles.active=aws --spring.config.location=/tmp/ \ No newline at end of file diff --git a/README.md b/README.md deleted file mode 100644 index 500222b..0000000 --- a/README.md +++ /dev/null @@ -1,22 +0,0 @@ -# data-ingestion - -## spinning up the infrastructure -docker-compose -p statement up - -## view container's IP address -docker ps -q | xargs -n 1 docker inspect --format '{{ .Name }} {{range .NetworkSettings.Networks}} {{.IPAddress}}{{end}}' | sed 's#^/##'; - -## create topics -docker exec -it statement_kafka_1 kafka-topics --bootstrap-server localhost:9092 --create --replication-factor 1 --partitions 10 --topic load -docker exec -it statement_kafka_1 kafka-topics --bootstrap-server localhost:9092 --create --replication-factor 1 --partitions 10 --topic ingestion - -## consume messages -docker exec -it statement_kafka_1 kafka-console-consumer --bootstrap-server localhost:9092 --topic ingestion --from-beginning - -## list consumer groups -docker exec -it statement_kafka_1 kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group data-ingestion - -## search for data -curl -X POST "http://240.12.0.2:9200/extrato/_search" -d'{"query":{"query_string":{"query": "8bfca9fa-ecdf-4f18-bfe3-e7f3c0360a40"}}}' -H 'Content-type:application/json' -curl -X POST "http://240.12.0.2:9200/extrato/_search" -d'{"query":{"query_string":{"query": "946eccbc-a87b-4aa5-b856-a13ca34ced98"}}}' -H 'Content-type:application/json' -curl -X POST "http://240.12.0.2:9200/extrato/_search?pretty" \ No newline at end of file diff --git a/docker-compose.yaml b/docker-compose.yaml deleted file mode 100644 index 044e92b..0000000 --- a/docker-compose.yaml +++ /dev/null @@ -1,42 +0,0 @@ -version: "3.3" -services: - kafka: - image: lensesio/fast-data-dev - restart: unless-stopped - environment: - - ADV_HOST=240.12.0.2 - - RUNNING_SAMPLEDATA=0 - - RUNTESTS=0 - - CONNECT_PORT=0 - networks: - kafka: - ipv4_address: 240.12.0.2 - - elasticsearch: - image: elasticsearch:7.7.1 - network_mode: "service:kafka" - restart: unless-stopped - environment: - - discovery.type=single-node - - grafana: - image: grafana/grafana - network_mode: "service:kafka" - restart: unless-stopped - - prometheus: - image: prom/prometheus - network_mode: "service:kafka" - restart: unless-stopped - command: --config.file=/etc/prometheus/prometheus.yml - volumes: - - ${PWD}/prometheus.yml:/etc/prometheus/prometheus.yml - -networks: - public: - external: true - kafka: - driver: bridge - ipam: - config: - - subnet: 240.12.0.0/24 \ No newline at end of file diff --git a/load-generate.yaml b/load-generate.yaml new file mode 100644 index 0000000..cc3a02e --- /dev/null +++ b/load-generate.yaml @@ -0,0 +1,61 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: load-generate + labels: + app: load-generate + namespace: load-generate +spec: + replicas: 0 + selector: + matchLabels: + app: load-generate + template: + metadata: + labels: + app: load-generate + spec: + serviceAccountName: statement-demo + containers: + - name: load-generate + image: 837108680928.dkr.ecr.us-east-1.amazonaws.com/load-generate:latest + imagePullPolicy: Always + volumeMounts: + - mountPath: /tmp/application-aws.properties + name: config + subPath: application-aws.properties + resources: + limits: + memory: "3Gi" + cpu: 2 + requests: + memory: "3Gi" + cpu: 2 + env: + - name: JAVA_OPTS + value: "-XX:MaxRAMPercentage=90.0" + - name: KAFKA_BOOTSTRAP + valueFrom: + configMapKeyRef: + name: kafka + key: bootstrap + - name: KAFKA_GROUP_ID + value: "load-generate" + - name: KAFKA_MAX_POOL_RECORDS + value: "1" + - name: KAFKA_CONCURRENCY + value: "10" + - name: KAFKA_PRODUCER_TOPIC + value: "generate" + - name: KAFKA_CONSUMER_TOPIC + value: "load" + - name: CLOUDWATCH_NAMESPACE + value: "statement12" + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + volumes: + - configMap: + name: application.properties + name: config diff --git a/pom.xml b/pom.xml index be6ec43..2611651 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,8 @@ Demo project for Spring Boot - 1.8 + 11 + 1.11.887 @@ -23,15 +24,10 @@ org.springframework.boot spring-boot-starter-actuator - - org.springframework.boot - spring-boot-starter-web - org.springframework.kafka spring-kafka - org.projectlombok lombok @@ -42,50 +38,71 @@ commons-lang3 3.11 - - io.micrometer - micrometer-registry-prometheus - com.github.javafaker javafaker 1.0.2 - - org.springframework.boot - spring-boot-starter-test - test - - - org.junit.vintage - junit-vintage-engine - - - org.springframework.kafka spring-kafka-test test - - junit - junit - test - - junit - junit - test + io.micrometer + micrometer-core + 1.5.5 + + + com.amazonaws + aws-java-sdk-bom + ${aws.sdk.version} + pom + + + com.amazonaws + aws-java-sdk-sts + ${aws.sdk.version} + + + com.amazonaws + aws-java-sdk-cloudwatch + ${aws.sdk.version} + + + org.springframework.cloud + spring-cloud-starter-aws + 2.2.4.RELEASE + + + io.micrometer + micrometer-registry-cloudwatch + 1.5.5 - - - - org.springframework.boot - spring-boot-maven-plugin - - + + + org.apache.maven.plugins + maven-jar-plugin + + + **/application*.properties + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + repackage + + + + + diff --git a/prometheus.yml b/prometheus.yml deleted file mode 100644 index f19aa03..0000000 --- a/prometheus.yml +++ /dev/null @@ -1,26 +0,0 @@ -# my global config -global: - scrape_interval: 15s # Set the scrape interval to every 15 seconds. Default is every 1 minute. - evaluation_interval: 15s # Evaluate rules every 15 seconds. The default is every 1 minute. - # scrape_timeout is set to the global default (10s). - -# Load rules once and periodically evaluate them according to the global 'evaluation_interval'. -rule_files: - # - "first_rules.yml" - # - "second_rules.yml" - -# A scrape configuration containing exactly one endpoint to scrape: -# Here it's Prometheus itself. -scrape_configs: - # The job name is added as a label `job=` to any timeseries scraped from this config. - - job_name: 'prometheus' - # metrics_path defaults to '/metrics' - # scheme defaults to 'http'. - static_configs: - - targets: ['127.0.0.1:9090'] - - - job_name: 'spring-actuator' - metrics_path: '/actuator/prometheus' - scrape_interval: 5s - static_configs: - - targets: ['192.168.100.123:8080'] diff --git a/src/main/java/br/com/exemplo/dataingestion/LoadGenerateApplication.java b/src/main/java/br/com/exemplo/dataingestion/LoadGenerateApplication.java index d54615f..b9635c6 100644 --- a/src/main/java/br/com/exemplo/dataingestion/LoadGenerateApplication.java +++ b/src/main/java/br/com/exemplo/dataingestion/LoadGenerateApplication.java @@ -1,16 +1,13 @@ package br.com.exemplo.dataingestion; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import lombok.RequiredArgsConstructor; + @SpringBootApplication @RequiredArgsConstructor -@Slf4j public class LoadGenerateApplication { - - public static void main(String[] args) { SpringApplication.run(LoadGenerateApplication.class, args); } diff --git a/src/main/java/br/com/exemplo/dataingestion/adapters/beans/KafkaConfig.java b/src/main/java/br/com/exemplo/dataingestion/adapters/beans/KafkaConfig.java index f334bab..41b0ad5 100644 --- a/src/main/java/br/com/exemplo/dataingestion/adapters/beans/KafkaConfig.java +++ b/src/main/java/br/com/exemplo/dataingestion/adapters/beans/KafkaConfig.java @@ -1,21 +1,12 @@ package br.com.exemplo.dataingestion.adapters.beans; -import br.com.exemplo.dataingestion.adapters.events.entities.DataLancamentoEvent; -import br.com.exemplo.dataingestion.adapters.events.entities.LoadEntity; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Scope; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.core.ProducerFactory; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; +import br.com.exemplo.dataingestion.adapters.events.entities.LoadEntity; @Configuration public class KafkaConfig { @@ -25,7 +16,7 @@ public class KafkaConfig { @Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(ConsumerFactory consumerFactory){ - ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory); factory.setConcurrency(concurrency); return factory; diff --git a/src/main/java/br/com/exemplo/dataingestion/adapters/beans/MicrometerConfig.java b/src/main/java/br/com/exemplo/dataingestion/adapters/beans/MicrometerConfig.java index e6d9c98..7ce24d3 100644 --- a/src/main/java/br/com/exemplo/dataingestion/adapters/beans/MicrometerConfig.java +++ b/src/main/java/br/com/exemplo/dataingestion/adapters/beans/MicrometerConfig.java @@ -1,9 +1,10 @@ package br.com.exemplo.dataingestion.adapters.beans; -import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; + @Configuration public class MicrometerConfig { diff --git a/src/main/java/br/com/exemplo/dataingestion/adapters/events/entities/DataLancamentoEvent.java b/src/main/java/br/com/exemplo/dataingestion/adapters/events/entities/DataLancamentoEvent.java index 41e750c..f742ada 100644 --- a/src/main/java/br/com/exemplo/dataingestion/adapters/events/entities/DataLancamentoEvent.java +++ b/src/main/java/br/com/exemplo/dataingestion/adapters/events/entities/DataLancamentoEvent.java @@ -1,7 +1,11 @@ package br.com.exemplo.dataingestion.adapters.events.entities; -import br.com.exemplo.dataingestion.domain.entities.Lancamento; -import lombok.*; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import lombok.ToString; @Getter @Setter diff --git a/src/main/java/br/com/exemplo/dataingestion/adapters/events/entities/LancamentoEvent.java b/src/main/java/br/com/exemplo/dataingestion/adapters/events/entities/LancamentoEvent.java index 174bf4c..96e8c97 100644 --- a/src/main/java/br/com/exemplo/dataingestion/adapters/events/entities/LancamentoEvent.java +++ b/src/main/java/br/com/exemplo/dataingestion/adapters/events/entities/LancamentoEvent.java @@ -1,12 +1,15 @@ package br.com.exemplo.dataingestion.adapters.events.entities; -import br.com.exemplo.dataingestion.domain.entities.Conta; -import lombok.*; - -import java.util.List; import java.util.Map; import java.util.UUID; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import lombok.ToString; + @Getter @Setter @NoArgsConstructor diff --git a/src/main/java/br/com/exemplo/dataingestion/adapters/events/listener/LoadListener.java b/src/main/java/br/com/exemplo/dataingestion/adapters/events/listener/LoadListener.java index e47212d..c03e54d 100644 --- a/src/main/java/br/com/exemplo/dataingestion/adapters/events/listener/LoadListener.java +++ b/src/main/java/br/com/exemplo/dataingestion/adapters/events/listener/LoadListener.java @@ -1,50 +1,57 @@ package br.com.exemplo.dataingestion.adapters.events.listener; -import br.com.exemplo.dataingestion.adapters.events.entities.LoadEntity; -import br.com.exemplo.dataingestion.domain.producer.ProducerService; -import br.com.exemplo.dataingestion.util.CreateLancamento; -import io.micrometer.core.instrument.MeterRegistry; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; +import java.util.concurrent.atomic.AtomicInteger; + import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.ApplicationContext; import org.springframework.kafka.annotation.KafkaHandler; import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.kafka.support.Acknowledgment; -import org.springframework.messaging.handler.annotation.MessageExceptionHandler; -import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; -import org.springframework.web.bind.annotation.PathVariable; -import org.springframework.web.bind.annotation.RequestBody; - -import javax.annotation.PostConstruct; -import java.util.List; -import java.util.Objects; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicInteger; -@Component -@RequiredArgsConstructor +import br.com.exemplo.dataingestion.adapters.events.entities.LoadEntity; +import br.com.exemplo.dataingestion.domain.producer.ProducerService; +import br.com.exemplo.dataingestion.util.CreateLancamento; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.Timer; +import lombok.extern.slf4j.Slf4j; + @Slf4j +@Component @KafkaListener(groupId = "${spring.kafka.consumer.group-id}", topics = "${data.ingestion.consumer.topic}", containerFactory = "kafkaListenerContainerFactory") public class LoadListener { - private final CreateLancamento createLancamento; - private final ApplicationContext applicationContext; + public LoadListener(CreateLancamento createLancamento, ProducerService producerService, @Value("${POD_NAME}") String podName) { + this.createLancamento = createLancamento; + this.producerService = producerService; + counter = Metrics.globalRegistry.counter("producer.account.items", "Type", podName); + timer = Metrics.globalRegistry.timer("producer.account.elapsed", "Type", podName); + } - private final MeterRegistry simpleMeterRegistry; - private ProducerService producerService; + private final CreateLancamento createLancamento; + private final ProducerService producerService; + private final Counter counter; + private final AtomicInteger accounts = new AtomicInteger(0); + private final Timer timer; @KafkaHandler public void geraEvento(LoadEntity loadEntity) { + log.debug("will generate {} records for account {}", loadEntity.getQuantidadeDias(), loadEntity.getIdConta()); + Timer.Sample sample = Timer.start(Metrics.globalRegistry); for (int i = 0; i < loadEntity.getQuantidadeDias(); i++) { producerService.produce(createLancamento.createByContaAndData(loadEntity.getIdConta(), i)); } + + counter.increment(); + sample.stop( + timer + ); + log.info( + "Accounts so far: {}", + accounts.incrementAndGet() + ); } } diff --git a/src/main/java/br/com/exemplo/dataingestion/adapters/events/mappers/LancamentoToEventMapperImpl.java b/src/main/java/br/com/exemplo/dataingestion/adapters/events/mappers/LancamentoToEventMapperImpl.java index 91cde47..af35451 100644 --- a/src/main/java/br/com/exemplo/dataingestion/adapters/events/mappers/LancamentoToEventMapperImpl.java +++ b/src/main/java/br/com/exemplo/dataingestion/adapters/events/mappers/LancamentoToEventMapperImpl.java @@ -1,11 +1,11 @@ package br.com.exemplo.dataingestion.adapters.events.mappers; +import org.springframework.stereotype.Component; + import br.com.exemplo.dataingestion.adapters.events.entities.ContaEvent; import br.com.exemplo.dataingestion.adapters.events.entities.DataLancamentoEvent; import br.com.exemplo.dataingestion.adapters.events.entities.LancamentoEvent; -import br.com.exemplo.dataingestion.domain.entities.Conta; import br.com.exemplo.dataingestion.domain.entities.Lancamento; -import org.springframework.stereotype.Component; @Component public class LancamentoToEventMapperImpl implements EventMapper { diff --git a/src/main/java/br/com/exemplo/dataingestion/adapters/events/producers/ProducerServiceImpl.java b/src/main/java/br/com/exemplo/dataingestion/adapters/events/producers/ProducerServiceImpl.java index 078fe78..f62ad22 100644 --- a/src/main/java/br/com/exemplo/dataingestion/adapters/events/producers/ProducerServiceImpl.java +++ b/src/main/java/br/com/exemplo/dataingestion/adapters/events/producers/ProducerServiceImpl.java @@ -1,36 +1,64 @@ package br.com.exemplo.dataingestion.adapters.events.producers; -import br.com.exemplo.dataingestion.adapters.events.entities.DataLancamentoEvent; -import br.com.exemplo.dataingestion.adapters.events.mappers.EventMapper; -import br.com.exemplo.dataingestion.domain.entities.Lancamento; -import br.com.exemplo.dataingestion.domain.producer.ProducerService; -import io.micrometer.core.instrument.MeterRegistry; -import io.micrometer.core.instrument.Timer; -import lombok.RequiredArgsConstructor; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.annotation.PostConstruct; + import org.apache.kafka.clients.producer.ProducerRecord; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Scope; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; -@RequiredArgsConstructor +import br.com.exemplo.dataingestion.adapters.events.entities.DataLancamentoEvent; +import br.com.exemplo.dataingestion.adapters.events.mappers.EventMapper; +import br.com.exemplo.dataingestion.domain.entities.Lancamento; +import br.com.exemplo.dataingestion.domain.producer.ProducerService; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.Timer; +import lombok.extern.slf4j.Slf4j; + +@Slf4j @Component @Scope(value = "prototype") public class ProducerServiceImpl implements ProducerService { + @Value("${POD_NAME}") + String podName; + @Value("${data.ingestion.producer.topic}") private String producerTopic; - private final KafkaTemplate kafkaTemplate; - private final EventMapper lancamentoDataLancamentoEventEventMapper; - private final MeterRegistry simpleMeterRegistry; + @Autowired + private KafkaTemplate kafkaTemplate; + + @Autowired + private EventMapper lancamentoDataLancamentoEventEventMapper; + + private Counter counter; + private final AtomicInteger records = new AtomicInteger(0); + private Timer timer; + + @PostConstruct + private void init() { + counter = Metrics.globalRegistry.counter("producer.items", "Type", podName); + timer = Metrics.globalRegistry.timer("producer.elapsed", "Type", podName); + } + @Override public Lancamento produce(Lancamento lancamento) { - simpleMeterRegistry.counter("kafka.contador","type","producao","thread",String.valueOf(Thread.currentThread().getId())).increment(); - Timer.Sample sample = Timer.start(simpleMeterRegistry); - ProducerRecord producerRecord = new ProducerRecord(producerTopic, lancamentoDataLancamentoEventEventMapper.convert(lancamento)); - sample.stop(simpleMeterRegistry.timer("kafka.time","type","producao","thread",String.valueOf(Thread.currentThread().getId()))); - kafkaTemplate.send(producerRecord); + Timer.Sample sample = Timer.start(Metrics.globalRegistry); + kafkaTemplate.send( + new ProducerRecord (producerTopic, lancamentoDataLancamentoEventEventMapper.convert(lancamento)) + ); + counter.increment(); + sample.stop(timer); + log.debug( + "Records so far: {}", + records.incrementAndGet() + ); return lancamento; } } diff --git a/src/main/java/br/com/exemplo/dataingestion/domain/entities/Lancamento.java b/src/main/java/br/com/exemplo/dataingestion/domain/entities/Lancamento.java index 8d8e6f3..357bc1a 100644 --- a/src/main/java/br/com/exemplo/dataingestion/domain/entities/Lancamento.java +++ b/src/main/java/br/com/exemplo/dataingestion/domain/entities/Lancamento.java @@ -1,11 +1,15 @@ package br.com.exemplo.dataingestion.domain.entities; -import lombok.*; - -import java.util.List; import java.util.Map; import java.util.UUID; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import lombok.ToString; + @Getter @Setter @NoArgsConstructor diff --git a/src/main/java/br/com/exemplo/dataingestion/domain/producer/ProducerService.java b/src/main/java/br/com/exemplo/dataingestion/domain/producer/ProducerService.java index c2df14b..f3dce7b 100644 --- a/src/main/java/br/com/exemplo/dataingestion/domain/producer/ProducerService.java +++ b/src/main/java/br/com/exemplo/dataingestion/domain/producer/ProducerService.java @@ -1,6 +1,5 @@ package br.com.exemplo.dataingestion.domain.producer; -import br.com.exemplo.dataingestion.adapters.events.entities.DataLancamentoEvent; import br.com.exemplo.dataingestion.domain.entities.Lancamento; public interface ProducerService { diff --git a/src/main/java/br/com/exemplo/dataingestion/util/CreateLancamento.java b/src/main/java/br/com/exemplo/dataingestion/util/CreateLancamento.java index a604f11..edc0fc8 100644 --- a/src/main/java/br/com/exemplo/dataingestion/util/CreateLancamento.java +++ b/src/main/java/br/com/exemplo/dataingestion/util/CreateLancamento.java @@ -1,28 +1,27 @@ package br.com.exemplo.dataingestion.util; -import br.com.exemplo.dataingestion.adapters.events.entities.ContaEvent; -import br.com.exemplo.dataingestion.adapters.events.entities.LancamentoEvent; -import br.com.exemplo.dataingestion.adapters.events.entities.Location; -import br.com.exemplo.dataingestion.domain.entities.Conta; -import br.com.exemplo.dataingestion.domain.entities.Lancamento; -import br.com.exemplo.dataingestion.domain.producer.ProducerService; -import com.github.javafaker.Faker; -import lombok.RequiredArgsConstructor; -import lombok.SneakyThrows; -import org.apache.commons.lang3.StringUtils; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.stereotype.Component; - -import javax.annotation.PostConstruct; import java.math.BigDecimal; import java.time.OffsetDateTime; import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoUnit; -import java.time.temporal.TemporalUnit; -import java.util.*; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.UUID; + +import javax.annotation.PostConstruct; + +import com.github.javafaker.Faker; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import br.com.exemplo.dataingestion.adapters.events.entities.Location; +import br.com.exemplo.dataingestion.domain.entities.Conta; +import br.com.exemplo.dataingestion.domain.entities.Lancamento; +import lombok.RequiredArgsConstructor; @Component @RequiredArgsConstructor @@ -30,7 +29,6 @@ public class CreateLancamento { private final Faker faker; - private ExecutorService executorService; private List lista = new ArrayList<>(); private Random random = new Random(); @Value("${processamento.threads.geracao.massa:10}") @@ -88,7 +86,7 @@ public Lancamento createByContaAndData(UUID idConta,int dias) { map.put("estabelecimento",faker.company().name()); map.put("location", Location.builder().lat(faker.address().latitude()).lon(faker.address().longitude()).build()); map.put("categoriaEstabelecimento",faker.commerce().department()); - return Lancamento.builder() + Lancamento lancamento = Lancamento.builder() .codigoMoedaTransacao("986") .codigoMotivoLancamento(String.valueOf(random.nextInt(999999))) .codigoTipoOperacao("TEF_CC_CC") @@ -107,5 +105,6 @@ public Lancamento createByContaAndData(UUID idConta,int dias) { .textoComplementoLancamento(faker.commerce().productName()+" "+lista.get(random.nextInt(lista.size()-1))) .valorLancamento(BigDecimal.valueOf(random.nextDouble()).toString()) .build(); + return lancamento; } } diff --git a/src/main/resources/application-aws.properties b/src/main/resources/application-aws.properties new file mode 100644 index 0000000..7fa19b0 --- /dev/null +++ b/src/main/resources/application-aws.properties @@ -0,0 +1,41 @@ +spring.main.web-application-type=none + +spring.kafka.producer.bootstrap-servers=${KAFKA_BOOTSTRAP} +spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer +spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer +spring.kafka.producer.ssl.protocol=SSL +spring.kafka.producer.ssl.trust-store-location=file:kafka.client.truststore.jks +spring.kafka.properties.security.protocol=SSL + +data.ingestion.producer.topic=generate + +spring.kafka.consumer.bootstrap-servers=${KAFKA_BOOTSTRAP} +spring.kafka.consumer.enable-auto-commit=false +spring.kafka.consumer.group-id=load-generate +spring.kafka.consumer.max-poll-records=100 +spring.kafka.consumer.fetch-min-size=80000 +spring.kafka.consumer.fetch-max-wait=500 +spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer +spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer +spring.kafka.consumer.properties.spring.json.trusted.packages=* +spring.kafka.consumer.ssl.protocol=SSL +spring.kafka.consumer.ssl.trust-store-location=file:kafka.client.truststore.jks +spring.kafka.listener.ack-mode=BATCH +spring.kafka.listener.concurrency=5 + +data.ingestion.consumer.topic=load + +logging.level.io.micrometer=INFO +management.metrics.enable.all=false +management.metrics.enable.producer=true +management.metrics.enable.kafka=true +management.metrics.enable.jvm=true +management.metrics.enable.process=true + +management.metrics.use-global-registry=true +management.metrics.export.cloudwatch.enabled=true +management.metrics.export.cloudwatch.namespace=${CLOUDWATCH_NAMESPACE} +management.metrics.export.cloudwatch.step=60s +management.metrics.export.cloudwatch.batchSize=50 + +cloud.aws.stack.auto=false \ No newline at end of file diff --git a/src/main/resources/application-local.properties b/src/main/resources/application-local.properties new file mode 100644 index 0000000..35f3df0 --- /dev/null +++ b/src/main/resources/application-local.properties @@ -0,0 +1,39 @@ +spring.main.web-application-type=none + +spring.kafka.producer.bootstrap-servers=240.12.0.2:9092 +spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer +spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer + +data.ingestion.producer.topic=generate + +spring.kafka.consumer.bootstrap-servers=240.12.0.2:9092 +spring.kafka.consumer.enable-auto-commit=true +spring.kafka.consumer.group-id=load-generate +spring.kafka.consumer.max-poll-records=${MAX_POOL_RECORDS:1} +spring.kafka.consumer.fetch-min-size=80000 +spring.kafka.consumer.fetch-max-wait=500 +spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer +spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer +spring.kafka.consumer.properties.spring.json.trusted.packages=* +spring.kafka.listener.ack-mode=BATCH +spring.kafka.listener.concurrency=${KAFKA_CONCURRENCY:1} + +data.ingestion.consumer.topic=load + +logging.level.io.micrometer=INFO +management.metrics.enable.all=false +management.metrics.enable.producer=true +management.metrics.enable.kafka=true +management.metrics.enable.jvm=true +management.metrics.enable.process=true + +management.metrics.use-global-registry=true +management.metrics.export.cloudwatch.enabled=true +management.metrics.export.cloudwatch.namespace=statement12 +management.metrics.export.cloudwatch.step=60s +management.metrics.export.cloudwatch.batchSize=50 + +cloud.aws.stack.auto=false +cloud.aws.credentials.profileName=default +cloud.aws.region.auto=false +cloud.aws.region.static=us-east-1 \ No newline at end of file diff --git a/src/main/resources/application-remote.properties b/src/main/resources/application-remote.properties new file mode 100644 index 0000000..243224e --- /dev/null +++ b/src/main/resources/application-remote.properties @@ -0,0 +1,44 @@ +spring.main.web-application-type=none + +spring.kafka.producer.bootstrap-servers=b-7.statement-kafka.99d6fi.c10.kafka.us-east-1.amazonaws.com:9094,b-9.statement-kafka.99d6fi.c10.kafka.us-east-1.amazonaws.com:9094,b-8.statement-kafka.99d6fi.c10.kafka.us-east-1.amazonaws.com:9094 +spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer +spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer +spring.kafka.producer.ssl.protocol=SSL +spring.kafka.producer.ssl.trust-store-location=file:kafka.client.truststore.jks +spring.kafka.properties.security.protocol=SSL + +data.ingestion.producer.topic=generate + +spring.kafka.consumer.bootstrap-servers=b-7.statement-kafka.99d6fi.c10.kafka.us-east-1.amazonaws.com:9094,b-9.statement-kafka.99d6fi.c10.kafka.us-east-1.amazonaws.com:9094,b-8.statement-kafka.99d6fi.c10.kafka.us-east-1.amazonaws.com:9094 +spring.kafka.consumer.enable-auto-commit=false +spring.kafka.consumer.group-id=load-generate +spring.kafka.consumer.max-poll-records=1 +spring.kafka.consumer.fetch-min-size=80000 +spring.kafka.consumer.fetch-max-wait=500 +spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer +spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer +spring.kafka.consumer.properties.spring.json.trusted.packages=* +spring.kafka.consumer.ssl.protocol=SSL +spring.kafka.consumer.ssl.trust-store-location=file:kafka.client.truststore.jks +spring.kafka.listener.ack-mode=BATCH +spring.kafka.listener.concurrency=1 + +data.ingestion.consumer.topic=load + +logging.level.io.micrometer=INFO +management.metrics.enable.all=false +management.metrics.enable.producer=true +management.metrics.enable.kafka=true +management.metrics.enable.jvm=true +management.metrics.enable.process=true + +management.metrics.use-global-registry=true +management.metrics.export.cloudwatch.enabled=true +management.metrics.export.cloudwatch.namespace=statement12 +management.metrics.export.cloudwatch.step=60s +management.metrics.export.cloudwatch.batchSize=50 + +cloud.aws.region.static=us-east-1 +cloud.aws.stack.auto=false +cloud.aws.credentials.profileName=default +cloud.aws.region.auto=false \ No newline at end of file diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties deleted file mode 100644 index 596a274..0000000 --- a/src/main/resources/application.properties +++ /dev/null @@ -1,27 +0,0 @@ -server.port=8080 -spring.kafka.producer.bootstrap-servers=${KAFKA_BOOTSTRAP} -spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer -spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer -spring.kafka.producer.ssl.trust-store-location=file:kafka.client.truststore.jks -spring.kafka.producer.ssl.protocol=SSL -spring.kafka.properties.security.protocol=SSL -data.ingestion.producer.topic=${KAFKA_PRODUCER_TOPIC} -management.endpoints.web.exposure.include=info,health,prometheus,metrics - -spring.kafka.consumer.auto-offset-reset=earliest -spring.kafka.consumer.bootstrap-servers=${KAFKA_BOOTSTRAP} -spring.kafka.consumer.enable-auto-commit=false -spring.kafka.consumer.group-id=${KAFKA_GROUP_ID} -spring.kafka.consumer.max-poll-records=${MAX_POOL_RECORDS:1} -spring.kafka.consumer.fetch-max-wait=30000 -spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer -spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer -spring.kafka.consumer.properties.spring.json.trusted.packages=* -spring.kafka.listener.ack-mode=BATCH -spring.kafka.listener.concurrency=${KAFKA_CONCURRENCY:10} -data.ingestion.consumer.topic=${KAFKA_CONSUMER_TOPIC} -spring.kafka.consumer.ssl.protocol=SSL -spring.kafka.consumer.ssl.truststore-location=file:kafka.client.truststore.jks - -logging.dir=logs -logging.filename=application.log \ No newline at end of file diff --git a/src/test/java/br/com/exemplo/dataingestion/util/CreateLancamentoTest.java b/src/test/java/br/com/exemplo/dataingestion/util/CreateLancamentoTest.java index 5e4a848..9cbbb91 100644 --- a/src/test/java/br/com/exemplo/dataingestion/util/CreateLancamentoTest.java +++ b/src/test/java/br/com/exemplo/dataingestion/util/CreateLancamentoTest.java @@ -1,26 +1,19 @@ package br.com.exemplo.dataingestion.util; -import com.github.javafaker.Faker; -import lombok.SneakyThrows; -import org.apache.commons.lang3.StringUtils; -import org.junit.jupiter.api.Test; - -import java.io.*; -import java.util.HashSet; -import java.util.Set; +import java.io.BufferedWriter; +import java.io.FileOutputStream; +import java.io.OutputStreamWriter; import java.util.UUID; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ThreadPoolExecutor; -import static org.junit.jupiter.api.Assertions.*; +import org.apache.commons.lang3.StringUtils; + +import lombok.SneakyThrows; class CreateLancamentoTest { @SneakyThrows @org.junit.jupiter.api.Test void getIdConta() { - CreateLancamento createLancamento = new CreateLancamento(new Faker()); - Set set = new HashSet<>(); FileOutputStream outputStream = new FileOutputStream("MyFile.txt"); OutputStreamWriter outputStreamWriter = new OutputStreamWriter(outputStream, "UTF-16"); BufferedWriter bufferedWriter = new BufferedWriter(outputStreamWriter); From 2e494c5ddb156a0d73b099ff5917406b959177be Mon Sep 17 00:00:00 2001 From: edsena Date: Fri, 6 Nov 2020 13:44:23 -0300 Subject: [PATCH 6/6] setting the initial date for statetement creation --- .../adapters/events/entities/ContaEvent.java | 2 +- .../adapters/events/entities/LoadEntity.java | 11 +++++++++-- .../adapters/events/listener/LoadListener.java | 4 ++-- .../dataingestion/util/CreateLancamento.java | 16 ++++++++++++++-- 4 files changed, 26 insertions(+), 7 deletions(-) diff --git a/src/main/java/br/com/exemplo/dataingestion/adapters/events/entities/ContaEvent.java b/src/main/java/br/com/exemplo/dataingestion/adapters/events/entities/ContaEvent.java index bc058e8..23a632e 100644 --- a/src/main/java/br/com/exemplo/dataingestion/adapters/events/entities/ContaEvent.java +++ b/src/main/java/br/com/exemplo/dataingestion/adapters/events/entities/ContaEvent.java @@ -13,4 +13,4 @@ public class ContaEvent { private UUID numeroUnicoConta; private String codigoSufixoConta; -} +} \ No newline at end of file diff --git a/src/main/java/br/com/exemplo/dataingestion/adapters/events/entities/LoadEntity.java b/src/main/java/br/com/exemplo/dataingestion/adapters/events/entities/LoadEntity.java index 563e906..29e2900 100644 --- a/src/main/java/br/com/exemplo/dataingestion/adapters/events/entities/LoadEntity.java +++ b/src/main/java/br/com/exemplo/dataingestion/adapters/events/entities/LoadEntity.java @@ -1,9 +1,15 @@ package br.com.exemplo.dataingestion.adapters.events.entities; -import lombok.*; - +import java.time.LocalDate; import java.util.UUID; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import lombok.ToString; + @Getter @Setter @NoArgsConstructor @@ -13,4 +19,5 @@ public class LoadEntity { private UUID idConta; private int quantidadeDias; + private LocalDate dataFim; } diff --git a/src/main/java/br/com/exemplo/dataingestion/adapters/events/listener/LoadListener.java b/src/main/java/br/com/exemplo/dataingestion/adapters/events/listener/LoadListener.java index c03e54d..17ff8d6 100644 --- a/src/main/java/br/com/exemplo/dataingestion/adapters/events/listener/LoadListener.java +++ b/src/main/java/br/com/exemplo/dataingestion/adapters/events/listener/LoadListener.java @@ -38,10 +38,10 @@ public LoadListener(CreateLancamento createLancamento, ProducerService producerS @KafkaHandler public void geraEvento(LoadEntity loadEntity) { - log.debug("will generate {} records for account {}", loadEntity.getQuantidadeDias(), loadEntity.getIdConta()); + log.debug("will generate {} records for account {} starting at {}", loadEntity.getQuantidadeDias(), loadEntity.getIdConta(), loadEntity.getDataFim()); Timer.Sample sample = Timer.start(Metrics.globalRegistry); for (int i = 0; i < loadEntity.getQuantidadeDias(); i++) { - producerService.produce(createLancamento.createByContaAndData(loadEntity.getIdConta(), i)); + producerService.produce(createLancamento.createByContaAndData(loadEntity.getIdConta(), i, loadEntity.getDataFim())); } counter.increment(); diff --git a/src/main/java/br/com/exemplo/dataingestion/util/CreateLancamento.java b/src/main/java/br/com/exemplo/dataingestion/util/CreateLancamento.java index edc0fc8..0e9fbda 100644 --- a/src/main/java/br/com/exemplo/dataingestion/util/CreateLancamento.java +++ b/src/main/java/br/com/exemplo/dataingestion/util/CreateLancamento.java @@ -1,7 +1,10 @@ package br.com.exemplo.dataingestion.util; import java.math.BigDecimal; +import java.time.LocalDate; +import java.time.LocalTime; import java.time.OffsetDateTime; +import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoUnit; import java.util.ArrayList; @@ -79,8 +82,17 @@ public Lancamento create123() } - public Lancamento createByContaAndData(UUID idConta,int dias) { - String data = OffsetDateTime.now().minus(dias,ChronoUnit.DAYS).format(DateTimeFormatter.ISO_DATE_TIME); + public Lancamento createByContaAndData(UUID idConta, int dias, LocalDate dataFim) { + String data = OffsetDateTime.of( + dataFim, + LocalTime.MIDNIGHT, + ZoneOffset.ofHours(-3) + ).minus( + dias, + ChronoUnit.DAYS + ).format( + DateTimeFormatter.ISO_DATE_TIME + ); Map map = new HashMap<>(); map.put("nome",faker.name().fullName()); map.put("estabelecimento",faker.company().name());