diff --git a/Dockerfile b/Dockerfile index 76cecc5..110287e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,7 +1,5 @@ -FROM adoptopenjdk/openjdk11:jdk-11.0.2.9-slim +FROM adoptopenjdk/openjdk11:jre-11.0.8_10-alpine WORKDIR /opt -ENV PORT 8080 -EXPOSE 8080 COPY kafka.client.truststore.jks /opt/kafka.client.truststore.jks COPY target/*.jar /opt/app.jar -ENTRYPOINT exec java $JAVA_OPTS -jar app.jar \ No newline at end of file +ENTRYPOINT exec java $JAVA_OPTS -jar app.jar --spring.profiles.active=aws --spring.config.location=/tmp/ \ No newline at end of file diff --git a/README.md b/README.md deleted file mode 100644 index a4588e3..0000000 --- a/README.md +++ /dev/null @@ -1,26 +0,0 @@ -# data-ingestion - -bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 10 --topic ingestion - -bin/kafka-console-consumer.sh --topic ingestion --from-beginning --bootstrap-server localhost:9092 - -docker run --rm -e ADV_HOST=localhost -p 3030:3030 -p 8082:8082 -p 9092:9092 -p 2181:2181 -p 8081:8081 -e RUNNING_SAMPLEDATA=0 -e RUNTESTS=0 -e CONNECT_PORT=0 lensesio/fast-data-dev - -docker run -d --name es762 -p 9200:9200 -e "discovery.type=single-node" elasticsearch:7.6.2 - -curl -X POST "http://localhost:9200/extrato/_search" -d'{"query":{"query_string":{"query": "8bfca9fa-ecdf-4f18-bfe3-e7f3c0360a40"}}}' -H 'Content-type:application/json' - - -curl -X POST "http://localhost:9200/extrato/_search" -d'{"query":{"query_string":{"query": "946eccbc-a87b-4aa5-b856-a13ca34ced98"}}}' -H 'Content-type:application/json' - -docker run -d --name grafana -p 3000:3000 grafana/grafana - -docker run -d --name prometheus -p 9090:9090 -v /home/capiau3/Desktop/Projetos/prometheus.yml:/etc/prometheus/prometheus.yml prom/prometheus --config.file=/etc/prometheus/prometheus.yml - -ifconfig -a - -curl -X POST "http://localhost:9200/extrato/_search?pretty" - -bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group data-ingestion - -curl localhost:8080/geraevento/10000/1000000 \ No newline at end of file diff --git a/load-generate.yaml b/load-generate.yaml new file mode 100644 index 0000000..cc3a02e --- /dev/null +++ b/load-generate.yaml @@ -0,0 +1,61 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: load-generate + labels: + app: load-generate + namespace: load-generate +spec: + replicas: 0 + selector: + matchLabels: + app: load-generate + template: + metadata: + labels: + app: load-generate + spec: + serviceAccountName: statement-demo + containers: + - name: load-generate + image: 837108680928.dkr.ecr.us-east-1.amazonaws.com/load-generate:latest + imagePullPolicy: Always + volumeMounts: + - mountPath: /tmp/application-aws.properties + name: config + subPath: application-aws.properties + resources: + limits: + memory: "3Gi" + cpu: 2 + requests: + memory: "3Gi" + cpu: 2 + env: + - name: JAVA_OPTS + value: "-XX:MaxRAMPercentage=90.0" + - name: KAFKA_BOOTSTRAP + valueFrom: + configMapKeyRef: + name: kafka + key: bootstrap + - name: KAFKA_GROUP_ID + value: "load-generate" + - name: KAFKA_MAX_POOL_RECORDS + value: "1" + - name: KAFKA_CONCURRENCY + value: "10" + - name: KAFKA_PRODUCER_TOPIC + value: "generate" + - name: KAFKA_CONSUMER_TOPIC + value: "load" + - name: CLOUDWATCH_NAMESPACE + value: "statement12" + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + volumes: + - configMap: + name: application.properties + name: config diff --git a/pom.xml b/pom.xml index be6ec43..2611651 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,8 @@ Demo project for Spring Boot - 1.8 + 11 + 1.11.887 @@ -23,15 +24,10 @@ org.springframework.boot spring-boot-starter-actuator - - org.springframework.boot - spring-boot-starter-web - org.springframework.kafka spring-kafka - org.projectlombok lombok @@ -42,50 +38,71 @@ commons-lang3 3.11 - - io.micrometer - micrometer-registry-prometheus - com.github.javafaker javafaker 1.0.2 - - org.springframework.boot - spring-boot-starter-test - test - - - org.junit.vintage - junit-vintage-engine - - - org.springframework.kafka spring-kafka-test test - - junit - junit - test - - junit - junit - test + io.micrometer + micrometer-core + 1.5.5 + + + com.amazonaws + aws-java-sdk-bom + ${aws.sdk.version} + pom + + + com.amazonaws + aws-java-sdk-sts + ${aws.sdk.version} + + + com.amazonaws + aws-java-sdk-cloudwatch + ${aws.sdk.version} + + + org.springframework.cloud + spring-cloud-starter-aws + 2.2.4.RELEASE + + + io.micrometer + micrometer-registry-cloudwatch + 1.5.5 - - - - org.springframework.boot - spring-boot-maven-plugin - - + + + org.apache.maven.plugins + maven-jar-plugin + + + **/application*.properties + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + repackage + + + + + diff --git a/prometheus.yml b/prometheus.yml deleted file mode 100644 index f19aa03..0000000 --- a/prometheus.yml +++ /dev/null @@ -1,26 +0,0 @@ -# my global config -global: - scrape_interval: 15s # Set the scrape interval to every 15 seconds. Default is every 1 minute. - evaluation_interval: 15s # Evaluate rules every 15 seconds. The default is every 1 minute. - # scrape_timeout is set to the global default (10s). - -# Load rules once and periodically evaluate them according to the global 'evaluation_interval'. -rule_files: - # - "first_rules.yml" - # - "second_rules.yml" - -# A scrape configuration containing exactly one endpoint to scrape: -# Here it's Prometheus itself. -scrape_configs: - # The job name is added as a label `job=` to any timeseries scraped from this config. - - job_name: 'prometheus' - # metrics_path defaults to '/metrics' - # scheme defaults to 'http'. - static_configs: - - targets: ['127.0.0.1:9090'] - - - job_name: 'spring-actuator' - metrics_path: '/actuator/prometheus' - scrape_interval: 5s - static_configs: - - targets: ['192.168.100.123:8080'] diff --git a/src/main/java/br/com/exemplo/dataingestion/LoadGenerateApplication.java b/src/main/java/br/com/exemplo/dataingestion/LoadGenerateApplication.java index 9ca4741..b9635c6 100644 --- a/src/main/java/br/com/exemplo/dataingestion/LoadGenerateApplication.java +++ b/src/main/java/br/com/exemplo/dataingestion/LoadGenerateApplication.java @@ -1,42 +1,14 @@ package br.com.exemplo.dataingestion; -import br.com.exemplo.dataingestion.adapters.controllers.servers.GenerateLoadController; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.context.event.ApplicationReadyEvent; -import org.springframework.context.event.EventListener; -import javax.annotation.PostConstruct; +import lombok.RequiredArgsConstructor; @SpringBootApplication @RequiredArgsConstructor -@Slf4j public class LoadGenerateApplication { - - @Value("${registros.total:1000000}") - private int registro; - - @Value("${contas.total:1000}") - private int contas; - - @Value("${dias.total:90}") - private int dias; - - private final GenerateLoadController generateLoadController; - public static void main(String[] args) { SpringApplication.run(LoadGenerateApplication.class, args); } - - - @EventListener(ApplicationReadyEvent.class) - public void run() throws Exception { - log.info("Iniciando a produção de {} com {} contas e com {} dias retroativos",registro,contas,dias); - generateLoadController.geraEvento(contas,registro,dias); - log.info("Liberando comando da aplicação"); - } } diff --git a/src/main/java/br/com/exemplo/dataingestion/adapters/beans/KafkaConfig.java b/src/main/java/br/com/exemplo/dataingestion/adapters/beans/KafkaConfig.java index de8230b..41b0ad5 100644 --- a/src/main/java/br/com/exemplo/dataingestion/adapters/beans/KafkaConfig.java +++ b/src/main/java/br/com/exemplo/dataingestion/adapters/beans/KafkaConfig.java @@ -1,27 +1,24 @@ package br.com.exemplo.dataingestion.adapters.beans; -import br.com.exemplo.dataingestion.adapters.events.entities.DataLancamentoEvent; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Scope; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.core.ProducerFactory; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; +import br.com.exemplo.dataingestion.adapters.events.entities.LoadEntity; @Configuration public class KafkaConfig { + @Value("${spring.kafka.listener.concurrency}") + private int concurrency; + + @Bean - @Scope(value = "prototype") - public KafkaTemplate kafkaTemplate1(ProducerFactory producerFactory){ - Map map = new HashMap<>(); - map.put(ProducerConfig.CLIENT_ID_CONFIG,UUID.randomUUID().toString()); - return new KafkaTemplate(producerFactory,false,map); + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(ConsumerFactory consumerFactory){ + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); + factory.setConsumerFactory(consumerFactory); + factory.setConcurrency(concurrency); + return factory; } } diff --git a/src/main/java/br/com/exemplo/dataingestion/adapters/beans/MicrometerConfig.java b/src/main/java/br/com/exemplo/dataingestion/adapters/beans/MicrometerConfig.java index e6d9c98..7ce24d3 100644 --- a/src/main/java/br/com/exemplo/dataingestion/adapters/beans/MicrometerConfig.java +++ b/src/main/java/br/com/exemplo/dataingestion/adapters/beans/MicrometerConfig.java @@ -1,9 +1,10 @@ package br.com.exemplo.dataingestion.adapters.beans; -import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; + @Configuration public class MicrometerConfig { diff --git a/src/main/java/br/com/exemplo/dataingestion/adapters/controllers/servers/GenerateLoadController.java b/src/main/java/br/com/exemplo/dataingestion/adapters/controllers/servers/GenerateLoadController.java deleted file mode 100644 index 395d37e..0000000 --- a/src/main/java/br/com/exemplo/dataingestion/adapters/controllers/servers/GenerateLoadController.java +++ /dev/null @@ -1,76 +0,0 @@ -package br.com.exemplo.dataingestion.adapters.controllers.servers; - -import br.com.exemplo.dataingestion.adapters.events.entities.DataLancamentoEvent; -import br.com.exemplo.dataingestion.domain.entities.Lancamento; -import br.com.exemplo.dataingestion.domain.producer.ProducerService; -import br.com.exemplo.dataingestion.util.CreateLancamento; -import io.micrometer.core.instrument.MeterRegistry; -import io.micrometer.core.instrument.Timer; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.ApplicationContext; -import org.springframework.http.ResponseEntity; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.stereotype.Component; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.PathVariable; -import org.springframework.web.bind.annotation.RestController; - -import javax.annotation.PostConstruct; -import java.applet.Applet; -import java.lang.reflect.Array; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReferenceArray; - -@Component -@RequiredArgsConstructor -@Slf4j -public class GenerateLoadController { - - private final CreateLancamento createLancamento; - private final ApplicationContext applicationContext; - - private ExecutorService executorService; - private final List producerServiceList; - private final MeterRegistry simpleMeterRegistry; - - @Value("${processamento.threads.producao:10}") - private int numeroThreadsProducao; - - @PostConstruct - public void constroiProducer() - { - this.executorService = Executors.newFixedThreadPool(numeroThreadsProducao); - log.debug("Inicializando produtores"); - for(int i=0;i { - log.info("Inicializando thread {} com {} registros",Thread.currentThread().getId(),numeroItensThread.get()); - createLancamento.create(numeroItensThread.get(), qtdConta,qtdDias,producerService); - log.info("Finalizando Thread thread {} ",Thread.currentThread().getId(),numeroItensThread.get()); - }); - } - } -} diff --git a/src/main/java/br/com/exemplo/dataingestion/adapters/events/entities/ContaEvent.java b/src/main/java/br/com/exemplo/dataingestion/adapters/events/entities/ContaEvent.java index bc058e8..23a632e 100644 --- a/src/main/java/br/com/exemplo/dataingestion/adapters/events/entities/ContaEvent.java +++ b/src/main/java/br/com/exemplo/dataingestion/adapters/events/entities/ContaEvent.java @@ -13,4 +13,4 @@ public class ContaEvent { private UUID numeroUnicoConta; private String codigoSufixoConta; -} +} \ No newline at end of file diff --git a/src/main/java/br/com/exemplo/dataingestion/adapters/events/entities/DataLancamentoEvent.java b/src/main/java/br/com/exemplo/dataingestion/adapters/events/entities/DataLancamentoEvent.java index 41e750c..f742ada 100644 --- a/src/main/java/br/com/exemplo/dataingestion/adapters/events/entities/DataLancamentoEvent.java +++ b/src/main/java/br/com/exemplo/dataingestion/adapters/events/entities/DataLancamentoEvent.java @@ -1,7 +1,11 @@ package br.com.exemplo.dataingestion.adapters.events.entities; -import br.com.exemplo.dataingestion.domain.entities.Lancamento; -import lombok.*; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import lombok.ToString; @Getter @Setter diff --git a/src/main/java/br/com/exemplo/dataingestion/adapters/events/entities/LancamentoEvent.java b/src/main/java/br/com/exemplo/dataingestion/adapters/events/entities/LancamentoEvent.java index 174bf4c..96e8c97 100644 --- a/src/main/java/br/com/exemplo/dataingestion/adapters/events/entities/LancamentoEvent.java +++ b/src/main/java/br/com/exemplo/dataingestion/adapters/events/entities/LancamentoEvent.java @@ -1,12 +1,15 @@ package br.com.exemplo.dataingestion.adapters.events.entities; -import br.com.exemplo.dataingestion.domain.entities.Conta; -import lombok.*; - -import java.util.List; import java.util.Map; import java.util.UUID; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import lombok.ToString; + @Getter @Setter @NoArgsConstructor diff --git a/src/main/java/br/com/exemplo/dataingestion/adapters/events/entities/LoadEntity.java b/src/main/java/br/com/exemplo/dataingestion/adapters/events/entities/LoadEntity.java new file mode 100644 index 0000000..29e2900 --- /dev/null +++ b/src/main/java/br/com/exemplo/dataingestion/adapters/events/entities/LoadEntity.java @@ -0,0 +1,23 @@ +package br.com.exemplo.dataingestion.adapters.events.entities; + +import java.time.LocalDate; +import java.util.UUID; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import lombok.ToString; + +@Getter +@Setter +@NoArgsConstructor +@AllArgsConstructor +@Builder +@ToString +public class LoadEntity { + private UUID idConta; + private int quantidadeDias; + private LocalDate dataFim; +} diff --git a/src/main/java/br/com/exemplo/dataingestion/adapters/events/listener/LoadListener.java b/src/main/java/br/com/exemplo/dataingestion/adapters/events/listener/LoadListener.java new file mode 100644 index 0000000..17ff8d6 --- /dev/null +++ b/src/main/java/br/com/exemplo/dataingestion/adapters/events/listener/LoadListener.java @@ -0,0 +1,57 @@ +package br.com.exemplo.dataingestion.adapters.events.listener; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.annotation.KafkaHandler; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +import br.com.exemplo.dataingestion.adapters.events.entities.LoadEntity; +import br.com.exemplo.dataingestion.domain.producer.ProducerService; +import br.com.exemplo.dataingestion.util.CreateLancamento; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.Timer; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Component +@KafkaListener(groupId = "${spring.kafka.consumer.group-id}", + topics = "${data.ingestion.consumer.topic}", + containerFactory = "kafkaListenerContainerFactory") +public class LoadListener { + + public LoadListener(CreateLancamento createLancamento, ProducerService producerService, @Value("${POD_NAME}") String podName) { + this.createLancamento = createLancamento; + this.producerService = producerService; + counter = Metrics.globalRegistry.counter("producer.account.items", "Type", podName); + timer = Metrics.globalRegistry.timer("producer.account.elapsed", "Type", podName); + } + + private final CreateLancamento createLancamento; + private final ProducerService producerService; + + private final Counter counter; + private final AtomicInteger accounts = new AtomicInteger(0); + private final Timer timer; + + @KafkaHandler + public void geraEvento(LoadEntity loadEntity) { + log.debug("will generate {} records for account {} starting at {}", loadEntity.getQuantidadeDias(), loadEntity.getIdConta(), loadEntity.getDataFim()); + Timer.Sample sample = Timer.start(Metrics.globalRegistry); + for (int i = 0; i < loadEntity.getQuantidadeDias(); i++) { + producerService.produce(createLancamento.createByContaAndData(loadEntity.getIdConta(), i, loadEntity.getDataFim())); + } + + counter.increment(); + sample.stop( + timer + ); + log.info( + "Accounts so far: {}", + accounts.incrementAndGet() + ); + } + +} diff --git a/src/main/java/br/com/exemplo/dataingestion/adapters/events/mappers/LancamentoToEventMapperImpl.java b/src/main/java/br/com/exemplo/dataingestion/adapters/events/mappers/LancamentoToEventMapperImpl.java index 91cde47..af35451 100644 --- a/src/main/java/br/com/exemplo/dataingestion/adapters/events/mappers/LancamentoToEventMapperImpl.java +++ b/src/main/java/br/com/exemplo/dataingestion/adapters/events/mappers/LancamentoToEventMapperImpl.java @@ -1,11 +1,11 @@ package br.com.exemplo.dataingestion.adapters.events.mappers; +import org.springframework.stereotype.Component; + import br.com.exemplo.dataingestion.adapters.events.entities.ContaEvent; import br.com.exemplo.dataingestion.adapters.events.entities.DataLancamentoEvent; import br.com.exemplo.dataingestion.adapters.events.entities.LancamentoEvent; -import br.com.exemplo.dataingestion.domain.entities.Conta; import br.com.exemplo.dataingestion.domain.entities.Lancamento; -import org.springframework.stereotype.Component; @Component public class LancamentoToEventMapperImpl implements EventMapper { diff --git a/src/main/java/br/com/exemplo/dataingestion/adapters/events/producers/ProducerServiceImpl.java b/src/main/java/br/com/exemplo/dataingestion/adapters/events/producers/ProducerServiceImpl.java index 078fe78..f62ad22 100644 --- a/src/main/java/br/com/exemplo/dataingestion/adapters/events/producers/ProducerServiceImpl.java +++ b/src/main/java/br/com/exemplo/dataingestion/adapters/events/producers/ProducerServiceImpl.java @@ -1,36 +1,64 @@ package br.com.exemplo.dataingestion.adapters.events.producers; -import br.com.exemplo.dataingestion.adapters.events.entities.DataLancamentoEvent; -import br.com.exemplo.dataingestion.adapters.events.mappers.EventMapper; -import br.com.exemplo.dataingestion.domain.entities.Lancamento; -import br.com.exemplo.dataingestion.domain.producer.ProducerService; -import io.micrometer.core.instrument.MeterRegistry; -import io.micrometer.core.instrument.Timer; -import lombok.RequiredArgsConstructor; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.annotation.PostConstruct; + import org.apache.kafka.clients.producer.ProducerRecord; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Scope; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; -@RequiredArgsConstructor +import br.com.exemplo.dataingestion.adapters.events.entities.DataLancamentoEvent; +import br.com.exemplo.dataingestion.adapters.events.mappers.EventMapper; +import br.com.exemplo.dataingestion.domain.entities.Lancamento; +import br.com.exemplo.dataingestion.domain.producer.ProducerService; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.Timer; +import lombok.extern.slf4j.Slf4j; + +@Slf4j @Component @Scope(value = "prototype") public class ProducerServiceImpl implements ProducerService { + @Value("${POD_NAME}") + String podName; + @Value("${data.ingestion.producer.topic}") private String producerTopic; - private final KafkaTemplate kafkaTemplate; - private final EventMapper lancamentoDataLancamentoEventEventMapper; - private final MeterRegistry simpleMeterRegistry; + @Autowired + private KafkaTemplate kafkaTemplate; + + @Autowired + private EventMapper lancamentoDataLancamentoEventEventMapper; + + private Counter counter; + private final AtomicInteger records = new AtomicInteger(0); + private Timer timer; + + @PostConstruct + private void init() { + counter = Metrics.globalRegistry.counter("producer.items", "Type", podName); + timer = Metrics.globalRegistry.timer("producer.elapsed", "Type", podName); + } + @Override public Lancamento produce(Lancamento lancamento) { - simpleMeterRegistry.counter("kafka.contador","type","producao","thread",String.valueOf(Thread.currentThread().getId())).increment(); - Timer.Sample sample = Timer.start(simpleMeterRegistry); - ProducerRecord producerRecord = new ProducerRecord(producerTopic, lancamentoDataLancamentoEventEventMapper.convert(lancamento)); - sample.stop(simpleMeterRegistry.timer("kafka.time","type","producao","thread",String.valueOf(Thread.currentThread().getId()))); - kafkaTemplate.send(producerRecord); + Timer.Sample sample = Timer.start(Metrics.globalRegistry); + kafkaTemplate.send( + new ProducerRecord (producerTopic, lancamentoDataLancamentoEventEventMapper.convert(lancamento)) + ); + counter.increment(); + sample.stop(timer); + log.debug( + "Records so far: {}", + records.incrementAndGet() + ); return lancamento; } } diff --git a/src/main/java/br/com/exemplo/dataingestion/domain/entities/Lancamento.java b/src/main/java/br/com/exemplo/dataingestion/domain/entities/Lancamento.java index 8d8e6f3..357bc1a 100644 --- a/src/main/java/br/com/exemplo/dataingestion/domain/entities/Lancamento.java +++ b/src/main/java/br/com/exemplo/dataingestion/domain/entities/Lancamento.java @@ -1,11 +1,15 @@ package br.com.exemplo.dataingestion.domain.entities; -import lombok.*; - -import java.util.List; import java.util.Map; import java.util.UUID; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import lombok.ToString; + @Getter @Setter @NoArgsConstructor diff --git a/src/main/java/br/com/exemplo/dataingestion/domain/producer/ProducerService.java b/src/main/java/br/com/exemplo/dataingestion/domain/producer/ProducerService.java index c2df14b..f3dce7b 100644 --- a/src/main/java/br/com/exemplo/dataingestion/domain/producer/ProducerService.java +++ b/src/main/java/br/com/exemplo/dataingestion/domain/producer/ProducerService.java @@ -1,6 +1,5 @@ package br.com.exemplo.dataingestion.domain.producer; -import br.com.exemplo.dataingestion.adapters.events.entities.DataLancamentoEvent; import br.com.exemplo.dataingestion.domain.entities.Lancamento; public interface ProducerService { diff --git a/src/main/java/br/com/exemplo/dataingestion/util/CreateLancamento.java b/src/main/java/br/com/exemplo/dataingestion/util/CreateLancamento.java index 8a41649..0e9fbda 100644 --- a/src/main/java/br/com/exemplo/dataingestion/util/CreateLancamento.java +++ b/src/main/java/br/com/exemplo/dataingestion/util/CreateLancamento.java @@ -1,28 +1,30 @@ package br.com.exemplo.dataingestion.util; -import br.com.exemplo.dataingestion.adapters.events.entities.ContaEvent; -import br.com.exemplo.dataingestion.adapters.events.entities.LancamentoEvent; -import br.com.exemplo.dataingestion.adapters.events.entities.Location; -import br.com.exemplo.dataingestion.domain.entities.Conta; -import br.com.exemplo.dataingestion.domain.entities.Lancamento; -import br.com.exemplo.dataingestion.domain.producer.ProducerService; -import com.github.javafaker.Faker; -import lombok.RequiredArgsConstructor; -import lombok.SneakyThrows; -import org.apache.commons.lang3.StringUtils; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.stereotype.Component; - -import javax.annotation.PostConstruct; import java.math.BigDecimal; +import java.time.LocalDate; +import java.time.LocalTime; import java.time.OffsetDateTime; +import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoUnit; -import java.time.temporal.TemporalUnit; -import java.util.*; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.UUID; + +import javax.annotation.PostConstruct; + +import com.github.javafaker.Faker; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import br.com.exemplo.dataingestion.adapters.events.entities.Location; +import br.com.exemplo.dataingestion.domain.entities.Conta; +import br.com.exemplo.dataingestion.domain.entities.Lancamento; +import lombok.RequiredArgsConstructor; @Component @RequiredArgsConstructor @@ -30,7 +32,6 @@ public class CreateLancamento { private final Faker faker; - private ExecutorService executorService; private List lista = new ArrayList<>(); private Random random = new Random(); @Value("${processamento.threads.geracao.massa:10}") @@ -79,25 +80,36 @@ public Lancamento create123() .valorLancamento(BigDecimal.valueOf(1000.00).toString()) .build(); } - public Lancamento createWithParameter(UUID numeroConta, Random random, int dias) - { + + + public Lancamento createByContaAndData(UUID idConta, int dias, LocalDate dataFim) { + String data = OffsetDateTime.of( + dataFim, + LocalTime.MIDNIGHT, + ZoneOffset.ofHours(-3) + ).minus( + dias, + ChronoUnit.DAYS + ).format( + DateTimeFormatter.ISO_DATE_TIME + ); Map map = new HashMap<>(); map.put("nome",faker.name().fullName()); map.put("estabelecimento",faker.company().name()); map.put("location", Location.builder().lat(faker.address().latitude()).lon(faker.address().longitude()).build()); map.put("categoriaEstabelecimento",faker.commerce().department()); - return Lancamento.builder() + Lancamento lancamento = Lancamento.builder() .codigoMoedaTransacao("986") .codigoMotivoLancamento(String.valueOf(random.nextInt(999999))) .codigoTipoOperacao("TEF_CC_CC") .conta( Conta.builder() .codigoSufixoConta("100") - .numeroUnicoConta(numeroConta) + .numeroUnicoConta(idConta) .build() ) - .dataContabilLancamento(OffsetDateTime.now().minus(random.nextInt(dias), ChronoUnit.DAYS ).format(DateTimeFormatter.ISO_DATE_TIME)) - .dataLancamento(OffsetDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME)) + .dataContabilLancamento(data) + .dataLancamento(data) .indicadorLancamentoCompulsorioOcorrencia(random.nextBoolean()) .metadados(map) .numeroIdentificacaoLancamentoConta(UUID.randomUUID()) @@ -105,18 +117,6 @@ public Lancamento createWithParameter(UUID numeroConta, Random random, int dias) .textoComplementoLancamento(faker.commerce().productName()+" "+lista.get(random.nextInt(lista.size()-1))) .valorLancamento(BigDecimal.valueOf(random.nextDouble()).toString()) .build(); + return lancamento; } - @SneakyThrows - public void create(int quantidadeRegistros, int quantidadeContas,int quantidadeDias,ProducerService producerService) - { - for(int i=0;i set = new HashSet<>(); FileOutputStream outputStream = new FileOutputStream("MyFile.txt"); OutputStreamWriter outputStreamWriter = new OutputStreamWriter(outputStream, "UTF-16"); BufferedWriter bufferedWriter = new BufferedWriter(outputStreamWriter);