From 948d3921517bdca6949101a0d7d4d75eaa124551 Mon Sep 17 00:00:00 2001 From: "NG\\kfiertek" Date: Thu, 17 Mar 2022 07:11:41 +0100 Subject: [PATCH 1/3] spark processor --- pom.xml | 42 ++++++ .../bimrocket/ihub/config/SparkConfig.java | 95 +++++++++++++ .../bimrocket/ihub/connector/Processor.java | 11 ++ .../spark/SparkBatchUpdateProcessor.java | 125 ++++++++++++++++++ .../spark/SparkBatchUpdateRunnable.java | 94 +++++++++++++ .../ihub/service/ConnectorMapperService.java | 1 + .../ihub/util/BatchUpdateRunnable.java | 111 ++++++++++++++++ src/main/resources/application.properties | 17 +++ 8 files changed, 496 insertions(+) create mode 100644 src/main/java/org/bimrocket/ihub/config/SparkConfig.java create mode 100644 src/main/java/org/bimrocket/ihub/processors/spark/SparkBatchUpdateProcessor.java create mode 100644 src/main/java/org/bimrocket/ihub/processors/spark/SparkBatchUpdateRunnable.java create mode 100644 src/main/java/org/bimrocket/ihub/util/BatchUpdateRunnable.java diff --git a/pom.xml b/pom.xml index 2ac9d89..b0a1f30 100644 --- a/pom.xml +++ b/pom.xml @@ -49,6 +49,48 @@ + + + org.codehaus.janino + commons-compiler + 3.0.8 + + + org.codehaus.janino + janino + 3.0.8 + + + org.apache.spark + spark-core_2.13 + 3.2.0 + + + org.slf4j + slf4j-log4j12 + + + org.codehaus.janino + commons-compiler + + + + + org.apache.spark + spark-sql_2.13 + 3.2.0 + + + org.slf4j + slf4j-log4j12 + + + org.codehaus.janino + commons-compiler + + + + org.springframework.boot diff --git a/src/main/java/org/bimrocket/ihub/config/SparkConfig.java b/src/main/java/org/bimrocket/ihub/config/SparkConfig.java new file mode 100644 index 0000000..c4b26eb --- /dev/null +++ b/src/main/java/org/bimrocket/ihub/config/SparkConfig.java @@ -0,0 +1,95 @@ +/** +BIMROCKET + +Copyright (C) 2022, Ajuntament de Sant Feliu de Llobregat + +This program is licensed and may be used, modified and redistributed under +the terms of the European Public License (EUPL), either version 1.1 or (at +your option) any later version as soon as they are approved by the European +Commission. + +Alternatively, you may redistribute and/or modify this program under the +terms of the GNU Lesser General Public License as published by the Free +Software Foundation; either version 3 of the License, or (at your option) +any later version. + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + +See the licenses for the specific language governing permissions, limitations +and more details. + +You should have received a copy of the EUPL1.1 and the LGPLv3 licenses along +with this program; if not, you may find them at: + +https://joinup.ec.europa.eu/software/page/eupl/licence-eupl +http://www.gnu.org/licenses/ +and +https://www.gnu.org/licenses/lgpl.txt +**/ +package org.bimrocket.ihub.config; + +import org.apache.spark.sql.SparkSession; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; + +@Configuration +@ConditionalOnProperty(prefix = "ihub.spark", name = "enabled", havingValue = "true") +public class SparkConfig +{ + private static SparkSession session; + + + @Value("${ihub.spark.url}") + String sparkUrl; + + @Value("${ihub.spark.timezone}") + String sparkTimezone; + + @Value("${ihub.spark.driver.host}") + String sparkDriverHost; + + @Value("${ihub.spark.driver.port}") + String sparkDriverPort; + + @Value("${ihub.spark.blockmanager.port}") + String sparkBlockManagerPort; + + @Value("${ihub.spark.driver.memory}") + String sparkDriverMemory; + + @Value("${ihub.spark.deploy.mode}") + String sparkDeployMode; + + @Value("${ihub.spark.executor.cores}") + String sparkExecutorCores; + + @Value("${ihub.spark.executor.memory}") + String sparkExecutorMemory; + + @Bean + @Primary + public SparkSession buildContext() { + SparkConfig.session = SparkSession.builder() + .master(sparkUrl) + .config("spark.sql.session.timezone",sparkTimezone) + .config("spark.blockManager.port", sparkBlockManagerPort) + .config("spark.driver.port", sparkDriverPort) + .config("spark.driver.host", sparkDriverHost) + .config("spark.driver.memory", sparkDriverMemory) + .config("spark.driver.bindAddress", "0.0.0.0") + .config("spark.submit.deployMode", sparkDeployMode) + .config("spark.executor.core",sparkExecutorCores) + .config("spark.executor.memory", sparkExecutorMemory) + .getOrCreate(); + return SparkConfig.getSession(); + } + + public static SparkSession getSession() { + return SparkConfig.session; + } +} diff --git a/src/main/java/org/bimrocket/ihub/connector/Processor.java b/src/main/java/org/bimrocket/ihub/connector/Processor.java index c8bc713..bc4115a 100644 --- a/src/main/java/org/bimrocket/ihub/connector/Processor.java +++ b/src/main/java/org/bimrocket/ihub/connector/Processor.java @@ -31,6 +31,8 @@ package org.bimrocket.ihub.connector; +import java.util.Properties; + /** * * @author realor @@ -40,6 +42,7 @@ public abstract class Processor private Connector connector; protected String description; protected boolean enabled; + protected Properties props = new Properties(); public Processor() { @@ -80,6 +83,14 @@ public void setEnabled(boolean enabled) { this.enabled = enabled; } + + public Properties getProperties() { + return this.props; + } + + public void setProperty(String key, String value) { + this.props.setProperty(key, value); + } /** diff --git a/src/main/java/org/bimrocket/ihub/processors/spark/SparkBatchUpdateProcessor.java b/src/main/java/org/bimrocket/ihub/processors/spark/SparkBatchUpdateProcessor.java new file mode 100644 index 0000000..46ac29c --- /dev/null +++ b/src/main/java/org/bimrocket/ihub/processors/spark/SparkBatchUpdateProcessor.java @@ -0,0 +1,125 @@ +/** +BIMROCKET + +Copyright (C) 2022, Ajuntament de Sant Feliu de Llobregat + +This program is licensed and may be used, modified and redistributed under +the terms of the European Public License (EUPL), either version 1.1 or (at +your option) any later version as soon as they are approved by the European +Commission. + +Alternatively, you may redistribute and/or modify this program under the +terms of the GNU Lesser General Public License as published by the Free +Software Foundation; either version 3 of the License, or (at your option) +any later version. + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + +See the licenses for the specific language governing permissions, limitations +and more details. + +You should have received a copy of the EUPL1.1 and the LGPLv3 licenses along +with this program; if not, you may find them at: + +https://joinup.ec.europa.eu/software/page/eupl/licence-eupl +http://www.gnu.org/licenses/ +and +https://www.gnu.org/licenses/lgpl.txt +**/ +package org.bimrocket.ihub.processors.spark; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.bimrocket.ihub.connector.ProcessedObject; +import org.bimrocket.ihub.processors.Sender; +import org.bimrocket.ihub.util.ConfigProperty; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; + +public class SparkBatchUpdateProcessor extends Sender +{ + private static final Logger log = + LoggerFactory.getLogger(SparkBatchUpdateProcessor.class); + + public static final String PROPS_FORMAT = "format"; + public static final String PROPS_MODE = "mode"; + public static final String PROPS_FILE_SYSTEM_PATH = "fs.path"; + public static final String PROPS_FILE_NAME= "file.name"; + public static final String PROPS_UPDATEABLE_LENGTH = "updateable.length"; + + + @ConfigProperty(name = SparkBatchUpdateProcessor.PROPS_FORMAT, + description = "Spark Format to write in.") + public String format = "parquet"; + + @ConfigProperty(name = SparkBatchUpdateProcessor.PROPS_MODE, + description = "Spark write mode to write in") + public String mode = "append"; + + @ConfigProperty(name = SparkBatchUpdateProcessor.PROPS_FILE_SYSTEM_PATH, + description = "Path to storage directory can be hdfs (hdfs://server/) or normal fs", required = false) + public String fsPath = "C:\\"; + + @ConfigProperty(name = SparkBatchUpdateProcessor.PROPS_FILE_NAME, + description = "Name of file to write in") + public String fileName; + + @ConfigProperty(name = SparkBatchUpdateProcessor.PROPS_UPDATEABLE_LENGTH, + description = "Amount of records to perform update/save", required = false) + public Integer updateAmount = 50; + + private ArrayNode all; + private ExecutorService executor; + private SparkBatchUpdateRunnable batchUpdater; + + + @Override + public void init() throws Exception + { + super.init(); + ObjectMapper mapper = new ObjectMapper(); + this.all = mapper.createArrayNode(); + this.executor = Executors.newSingleThreadExecutor(); + this.batchUpdater = new SparkBatchUpdateRunnable(this.all, this.getProperties()); + executor.submit(this.batchUpdater); + } + + @Override + public boolean processObject(ProcessedObject procObject) + { + if (procObject.isIgnore()) { + this.batchUpdater.runSave(); + return false; + } + + JsonNode toSend = procObject.getGlobalObject() != null ? procObject.getGlobalObject() : procObject.getLocalObject(); + if (toSend == null) + { + this.batchUpdater.runSave(); + return false; + } + + all.add(toSend); + synchronized (this) { + executor.notifyAll(); + } + log.debug("adding {} json object to all elements array", toSend.toPrettyString()); + + return true; + } + + @Override + public void end() { + log.debug("ending SparkBatchUpdaterProcessor"); + super.end(); + executor.shutdown(); + } + +} \ No newline at end of file diff --git a/src/main/java/org/bimrocket/ihub/processors/spark/SparkBatchUpdateRunnable.java b/src/main/java/org/bimrocket/ihub/processors/spark/SparkBatchUpdateRunnable.java new file mode 100644 index 0000000..b0681f4 --- /dev/null +++ b/src/main/java/org/bimrocket/ihub/processors/spark/SparkBatchUpdateRunnable.java @@ -0,0 +1,94 @@ +/** +BIMROCKET + +Copyright (C) 2022, Ajuntament de Sant Feliu de Llobregat + +This program is licensed and may be used, modified and redistributed under +the terms of the European Public License (EUPL), either version 1.1 or (at +your option) any later version as soon as they are approved by the European +Commission. + +Alternatively, you may redistribute and/or modify this program under the +terms of the GNU Lesser General Public License as published by the Free +Software Foundation; either version 3 of the License, or (at your option) +any later version. + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + +See the licenses for the specific language governing permissions, limitations +and more details. + +You should have received a copy of the EUPL1.1 and the LGPLv3 licenses along +with this program; if not, you may find them at: + +https://joinup.ec.europa.eu/software/page/eupl/licence-eupl +http://www.gnu.org/licenses/ +and +https://www.gnu.org/licenses/lgpl.txt +**/ +package org.bimrocket.ihub.processors.spark; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import java.util.UUID; + +import org.apache.spark.SparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.bimrocket.ihub.config.SparkConfig; +import org.bimrocket.ihub.util.BatchUpdateRunnable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; + +public class SparkBatchUpdateRunnable extends BatchUpdateRunnable +{ + private static final Logger log = LoggerFactory + .getLogger(SparkBatchUpdateRunnable.class); + + SparkSession session; + SparkContext context; + Properties sparkProps; + + public SparkBatchUpdateRunnable(ArrayNode all, Properties sparkProps) + { + super(all, Integer + .valueOf((String) sparkProps.get(SparkBatchUpdateProcessor.PROPS_UPDATEABLE_LENGTH))); + this.sparkProps = sparkProps; + this.session = SparkConfig.getSession(); + this.context = this.session.sparkContext(); + } + + @Override + protected void batchUpdate() + { + String json = null; + try + { + json = mapper.writeValueAsString(all); + } + catch (JsonProcessingException e) + { + log.error("While transforming all ArrayNode" + + " to json String Exception has occurred ", e); + } + if (json == null) + return; + List data = Arrays.asList(json); + + Dataset ds = session.createDataset(data, Encoders.STRING()); + + ds.write().mode((String) sparkProps.get(SparkBatchUpdateProcessor.PROPS_MODE)) + .format((String) sparkProps.get(SparkBatchUpdateProcessor.PROPS_FORMAT)) + .save((String) sparkProps.get(SparkBatchUpdateProcessor.PROPS_FILE_SYSTEM_PATH) + + (String) sparkProps.get(SparkBatchUpdateProcessor.PROPS_FILE_NAME)); + } +} \ No newline at end of file diff --git a/src/main/java/org/bimrocket/ihub/service/ConnectorMapperService.java b/src/main/java/org/bimrocket/ihub/service/ConnectorMapperService.java index a64fc23..7937886 100644 --- a/src/main/java/org/bimrocket/ihub/service/ConnectorMapperService.java +++ b/src/main/java/org/bimrocket/ihub/service/ConnectorMapperService.java @@ -241,6 +241,7 @@ public Map getProcessorProperties(Processor processor) String propertyName = propHandler.getName(); try { + processor.setProperty(propertyName, propHandler.getValue(processor).toString()); properties.put(propertyName, propHandler.getValue(processor)); } catch (Exception ex) diff --git a/src/main/java/org/bimrocket/ihub/util/BatchUpdateRunnable.java b/src/main/java/org/bimrocket/ihub/util/BatchUpdateRunnable.java new file mode 100644 index 0000000..f31e224 --- /dev/null +++ b/src/main/java/org/bimrocket/ihub/util/BatchUpdateRunnable.java @@ -0,0 +1,111 @@ +/** +BIMROCKET + +Copyright (C) 2022, Ajuntament de Sant Feliu de Llobregat + +This program is licensed and may be used, modified and redistributed under +the terms of the European Public License (EUPL), either version 1.1 or (at +your option) any later version as soon as they are approved by the European +Commission. + +Alternatively, you may redistribute and/or modify this program under the +terms of the GNU Lesser General Public License as published by the Free +Software Foundation; either version 3 of the License, or (at your option) +any later version. + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + +See the licenses for the specific language governing permissions, limitations +and more details. + +You should have received a copy of the EUPL1.1 and the LGPLv3 licenses along +with this program; if not, you may find them at: + +https://joinup.ec.europa.eu/software/page/eupl/licence-eupl +http://www.gnu.org/licenses/ +and +https://www.gnu.org/licenses/lgpl.txt +**/ +package org.bimrocket.ihub.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; + +/** + * + * @author kfiertek-nexus-geographics + * + */ +public abstract class BatchUpdateRunnable implements Runnable +{ + private static final Logger log = LoggerFactory + .getLogger(BatchUpdateRunnable.class); + + protected ArrayNode all; + protected int saveableSize; + protected boolean runCheck; + protected boolean saveCheck; + protected ObjectMapper mapper; + + protected BatchUpdateRunnable(ArrayNode all, int saveableSize) + { + this.all = all; + this.saveableSize = saveableSize; + this.runCheck = true; + this.saveCheck = false; + this.mapper = new ObjectMapper(); + } + + @Override + public void run() + { + while (runCheck) + { + if ((all.size() >= this.saveableSize) || saveCheck) + { + this.save(); + } + synchronized (this) + { + try + { + wait(); + } + catch (InterruptedException e) + { + log.error("Exception occurred while waiting ::", e); + } + + } + + } + log.debug("BatchWriterRunnable end"); + } + + public void runSave() { + this.saveCheck = true; + } + + protected void stop() { + this.runCheck = false; + } + + protected void save() + { + this.batchUpdate(); + this.cleanUp(); + } + + protected abstract void batchUpdate(); + + private void cleanUp() + { + this.all.removeAll(); + this.saveCheck = false; + } +} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 56f31d2..e533e0a 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1,3 +1,20 @@ +#ihub config +ihub.spark.enabled=${IHUB.SPARK.ENABLED:#{"true"}} + +# spark context configuration +ihub.spark.url=${IHUB.SPARK.URL:#{"local[*]"}} +ihub.spark.timezone=${IHUB.SPARK.TIMEZONE:#{"UTC"}} +ihub.spark.driver.host=${IHUB.SPARK.DRIVER.HOST:#{"127.0.0.1"}} +ihub.spark.driver.port=${IHUB.SPARK.DRIVER.PORT:#{"10027"}} +ihub.spark.driver.memory=${IHUB.SPARK.DRIVER.MEMORY:#{"4G"}} +ihub.spark.blockmanager.port=${IHUB.SPARK.BLOCKMANAGER.PORT:#{"10028"}} +ihub.spark.deploy.mode=${IHUB.SPARK.DEPLOY.MODE:#{"cluster"}} + +#spark executor configuration +ihub.spark.executor.cores=${IHUB.SPARK.EXECUTOR.CORES:#{"2"}} +ihub.spark.executor.memory=${IHUB.SPARK.EXECUTOR.MEMORY:#{"2G"}} + + # security spring.security.user.name=admin spring.security.user.password=admin From c04aeaf337c8c26999300c11301ce31c123d9978 Mon Sep 17 00:00:00 2001 From: "NG\\kfiertek" Date: Tue, 22 Mar 2022 16:00:17 +0100 Subject: [PATCH 2/3] changes to Spark processor --- .../bimrocket/ihub/config/SparkConfig.java | 104 ++++++++++------ .../bimrocket/ihub/connector/Connector.java | 105 ++++++++--------- .../bimrocket/ihub/connector/Processor.java | 4 + .../processors/kafka/JsonKafkaSender.java | 2 +- .../ihub/processors/kafka/KafkaLoader.java | 2 +- .../spark/SparkBatchUpdateRunnable.java | 94 --------------- ...hUpdateProcessor.java => SparkWriter.java} | 96 +++++++++++---- .../ihub/service/ConnectorMapperService.java | 2 +- .../ihub/util/BatchUpdateRunnable.java | 111 ------------------ src/main/resources/application.properties | 2 +- 10 files changed, 198 insertions(+), 324 deletions(-) delete mode 100644 src/main/java/org/bimrocket/ihub/processors/spark/SparkBatchUpdateRunnable.java rename src/main/java/org/bimrocket/ihub/processors/spark/{SparkBatchUpdateProcessor.java => SparkWriter.java} (58%) delete mode 100644 src/main/java/org/bimrocket/ihub/util/BatchUpdateRunnable.java diff --git a/src/main/java/org/bimrocket/ihub/config/SparkConfig.java b/src/main/java/org/bimrocket/ihub/config/SparkConfig.java index c4b26eb..5d06d6e 100644 --- a/src/main/java/org/bimrocket/ihub/config/SparkConfig.java +++ b/src/main/java/org/bimrocket/ihub/config/SparkConfig.java @@ -32,48 +32,32 @@ the terms of the European Public License (EUPL), either version 1.1 or (at import org.apache.spark.sql.SparkSession; import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.context.annotation.Bean; +import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Primary; @Configuration -@ConditionalOnProperty(prefix = "ihub.spark", name = "enabled", havingValue = "true") public class SparkConfig { private static SparkSession session; - - - @Value("${ihub.spark.url}") - String sparkUrl; - - @Value("${ihub.spark.timezone}") - String sparkTimezone; - - @Value("${ihub.spark.driver.host}") - String sparkDriverHost; - - @Value("${ihub.spark.driver.port}") - String sparkDriverPort; - - @Value("${ihub.spark.blockmanager.port}") - String sparkBlockManagerPort; - - @Value("${ihub.spark.driver.memory}") - String sparkDriverMemory; - - @Value("${ihub.spark.deploy.mode}") - String sparkDeployMode; - @Value("${ihub.spark.executor.cores}") - String sparkExecutorCores; - - @Value("${ihub.spark.executor.memory}") - String sparkExecutorMemory; + private static boolean enabled; + private static String sparkUrl; + private static String sparkTimezone; + private static String sparkDriverHost; + private static String sparkDriverPort; + private static String sparkBlockManagerPort; + private static String sparkDriverMemory; + private static String sparkDeployMode; + private static String sparkExecutorCores; + private static String sparkExecutorMemory; - @Bean - @Primary - public SparkSession buildContext() { + public static SparkSession buildContext() throws Exception { + if (SparkConfig.session != null) { + return SparkConfig.getSession(); + } + if (!SparkConfig.enabled) { + throw new Exception("Spark not enabled"); + } SparkConfig.session = SparkSession.builder() .master(sparkUrl) .config("spark.sql.session.timezone",sparkTimezone) @@ -92,4 +76,56 @@ public SparkSession buildContext() { public static SparkSession getSession() { return SparkConfig.session; } + + @Value("${ihub.spark.enabled}") + public void setEnabled(String enabled) { + SparkConfig.enabled = Boolean.valueOf(enabled); + }; + + @Value("${ihub.spark.url}") + public void setUrl(String url) { + SparkConfig.sparkUrl = url; + }; + + @Value("${ihub.spark.timezone}") + public void setTimezone(String sparkTimezone) { + SparkConfig.sparkTimezone = sparkTimezone; + }; + + @Value("${ihub.spark.driver.host}") + public void setDriverHost(String sparkDriverHost) { + SparkConfig.sparkDriverHost = sparkDriverHost; + }; + + @Value("${ihub.spark.driver.port}") + public void setDriverPort(String sparkDriverPort) { + SparkConfig.sparkDriverPort = sparkDriverPort; + }; + + @Value("${ihub.spark.blockmanager.port}") + public void setBlockmanagerPort(String sparkBlockManagerPort) { + SparkConfig.sparkBlockManagerPort = sparkBlockManagerPort; + }; + + @Value("${ihub.spark.driver.memory}") + public void setDriverMemory(String sparkDriverMemory) { + SparkConfig.sparkDriverMemory = sparkDriverMemory; + }; + + @Value("${ihub.spark.deploy.mode}") + public void setDeployMode(String sparkDeployMode) { + SparkConfig.sparkDeployMode = sparkDeployMode; + }; + + @Value("${ihub.spark.executor.cores}") + public void setExecutorCores(String sparkExecutorCores) { + SparkConfig.sparkExecutorCores = sparkExecutorCores; + }; + + @Value("${ihub.spark.executor.memory}") + public void setExecutorMemory(String sparkExecutorMemory) { + SparkConfig.sparkExecutorMemory = sparkExecutorMemory; + }; + + } diff --git a/src/main/java/org/bimrocket/ihub/connector/Connector.java b/src/main/java/org/bimrocket/ihub/connector/Connector.java index c68ffb7..dc52af2 100644 --- a/src/main/java/org/bimrocket/ihub/connector/Connector.java +++ b/src/main/java/org/bimrocket/ihub/connector/Connector.java @@ -53,8 +53,7 @@ */ public class Connector implements Runnable { - private static final Logger log = - LoggerFactory.getLogger(Connector.class); + private static final Logger log = LoggerFactory.getLogger(Connector.class); public static final String RUNNING_STATUS = "RUNNING"; public static final String STOPPED_STATUS = "STOPPED"; @@ -264,25 +263,14 @@ public String toString() { StringBuilder buffer = new StringBuilder(); - return buffer.append("Connector") - .append("{ name: \"") - .append(name) - .append("\", description: \"") - .append(description == null ? "" : description) - .append("\", inventory: \"") - .append(inventory) - .append("\", status: ") - .append(getStatus()) - .append(", autoStart: ") - .append(autoStart) - .append(", singleRun: ") - .append(singleRun) - .append(", debug: ") - .append(debugEnabled) - .append(", processors: ") - .append(processors) - .append(" }") - .toString(); + return buffer.append("Connector").append("{ name: \"").append(name) + .append("\", description: \"") + .append(description == null ? "" : description) + .append("\", inventory: \"").append(inventory).append("\", status: ") + .append(getStatus()).append(", autoStart: ").append(autoStart) + .append(", singleRun: ").append(singleRun).append(", debug: ") + .append(debugEnabled).append(", processors: ").append(processors) + .append(" }").toString(); } @Override @@ -301,8 +289,8 @@ public void run() { if (!initProcessors(runningProcessors)) throw new ProcessorInitException(427, - "Failed to initialize processor %s: %s", name, - lastError.getMessage()); + "Failed to initialize processor %s: %s", name, + lastError.getMessage()); while (!end) { @@ -313,14 +301,13 @@ public void run() { if (processor.isEnabled()) { - log.debug("Executing processor {}", - processor.getClass().getName()); - + log.debug("Executing processor {}", processor.getClass().getName()); if (processor.processObject(procObject)) { processorCount++; } - else break; + else + break; } } @@ -328,14 +315,20 @@ public void run() { updateIdPairRepository(procObject); updateStatistics(procObject); - log.debug("Object processed, type: {}, operation: {}, " - + "localId: {}, globalId: {}", - procObject.getObjectType(), procObject.getOperation(), - procObject.getLocalId(), procObject.getGlobalId()); + log.debug( + "Object processed, type: {}, operation: {}, " + + "localId: {}, globalId: {}", + procObject.getObjectType(), procObject.getOperation(), + procObject.getLocalId(), procObject.getGlobalId()); } if (processorCount == 0) { + for (var processor : runningProcessors) + { + if (processor.isEnabled()) + processor.afterProcessing(); + } if (singleRun) { end = true; @@ -356,12 +349,13 @@ public void run() catch (ProcessorInitException ex) { lastError = ex; - log.error(ex.getMessage()); + log.error("ProcessorInitException message :: {}, stacktrace ::", + ex.getMessage(), ex); } catch (Exception ex) { lastError = ex; - log.error("An error has ocurred: {}", ex.toString()); + log.error("An error has ocurred: {}", ex.toString(), ex); } finally { @@ -405,7 +399,7 @@ public Connector save() public ConnectorSetup saveSetup() { ConnectorSetup connSetup = service.getConnectorMapperService() - .getConnectorSetup(this); + .getConnectorSetup(this); service.getConnectorSetupRepository().save(connSetup); @@ -428,12 +422,12 @@ public Connector restore() throws Exception public ConnectorSetup restoreSetup() throws Exception { Optional optConnSetup = service - .getConnectorSetupRepository().findById(name); + .getConnectorSetupRepository().findById(name); if (optConnSetup.isPresent()) { ConnectorSetup connSetup = optConnSetup.get(); service.getConnectorMapperService().setConnectorSetup(this, connSetup, - true); + true); unsaved = false; lastError = null; @@ -451,14 +445,14 @@ protected boolean initProcessors(List processors) { processor.init(); log.debug("Processor #{}: {} initialized.", initialized, - processor.getClass().getName()); + processor.getClass().getName()); initialized++; } catch (Exception ex) { log.debug("Failed to initialize processor #{}: {}: {}", initialized, - processor.getClass().getName(), ex.toString()); + processor.getClass().getName(), ex.toString(), ex); lastError = ex; break; } @@ -485,16 +479,17 @@ public void endProcessors(List processors) { processor.end(); log.debug("Processor #{}: {} ended.", ended, - processor.getClass().getName()); + processor.getClass().getName()); ended++; } catch (Exception ex) { log.debug("Failed to end processor #{}: {}: {}", ended, - processor.getClass().getName(), ex.toString()); + processor.getClass().getName(), ex.toString()); - if (lastError == null) lastError = ex; + if (lastError == null) + lastError = ex; } } } @@ -503,9 +498,9 @@ void updateIdPairRepository(ProcessedObject procObject) { IdPairRepository idPairRepository = service.getIdPairRepository(); - Optional result = idPairRepository. - findByInventoryAndObjectTypeAndLocalId(inventory, - procObject.getObjectType(), procObject.getLocalId()); + Optional result = idPairRepository + .findByInventoryAndObjectTypeAndLocalId(inventory, + procObject.getObjectType(), procObject.getLocalId()); if (procObject.isDelete()) { @@ -552,17 +547,17 @@ void updateStatistics(ProcessedObject procObject) processed++; switch (procObject.getOperation()) { - case INSERT: - inserted++; - break; - case UPDATE: - updated++; - break; - case DELETE: - deleted++; - break; - default: - ignored++; + case INSERT: + inserted++; + break; + case UPDATE: + updated++; + break; + case DELETE: + deleted++; + break; + default: + ignored++; } } } diff --git a/src/main/java/org/bimrocket/ihub/connector/Processor.java b/src/main/java/org/bimrocket/ihub/connector/Processor.java index bc4115a..35e4ad3 100644 --- a/src/main/java/org/bimrocket/ihub/connector/Processor.java +++ b/src/main/java/org/bimrocket/ihub/connector/Processor.java @@ -121,6 +121,10 @@ public void init() throws Exception public void end() { } + + public void afterProcessing() { + + } @Override public String toString() diff --git a/src/main/java/org/bimrocket/ihub/processors/kafka/JsonKafkaSender.java b/src/main/java/org/bimrocket/ihub/processors/kafka/JsonKafkaSender.java index ed13e22..b103d80 100644 --- a/src/main/java/org/bimrocket/ihub/processors/kafka/JsonKafkaSender.java +++ b/src/main/java/org/bimrocket/ihub/processors/kafka/JsonKafkaSender.java @@ -47,7 +47,7 @@ public class JsonKafkaSender extends KafkaSender @Override public boolean processObject(ProcessedObject procObject) { - JsonNode toSend = procObject.getGlobalObject(); + JsonNode toSend = procObject.getGlobalObject() != null ? procObject.getGlobalObject() : procObject.getLocalObject(); if (toSend == null) { return false; diff --git a/src/main/java/org/bimrocket/ihub/processors/kafka/KafkaLoader.java b/src/main/java/org/bimrocket/ihub/processors/kafka/KafkaLoader.java index 2c17c1d..e049403 100644 --- a/src/main/java/org/bimrocket/ihub/processors/kafka/KafkaLoader.java +++ b/src/main/java/org/bimrocket/ihub/processors/kafka/KafkaLoader.java @@ -50,7 +50,7 @@ public abstract class KafkaLoader extends Loader description = "Kafka bootstrap servers address") public String bootstrapAddress; - @ConfigProperty(name = "groudId", + @ConfigProperty(name = "groupId", description = "Number of the kafka group to which the loader belongs.") public String groupId; diff --git a/src/main/java/org/bimrocket/ihub/processors/spark/SparkBatchUpdateRunnable.java b/src/main/java/org/bimrocket/ihub/processors/spark/SparkBatchUpdateRunnable.java deleted file mode 100644 index b0681f4..0000000 --- a/src/main/java/org/bimrocket/ihub/processors/spark/SparkBatchUpdateRunnable.java +++ /dev/null @@ -1,94 +0,0 @@ -/** -BIMROCKET - -Copyright (C) 2022, Ajuntament de Sant Feliu de Llobregat - -This program is licensed and may be used, modified and redistributed under -the terms of the European Public License (EUPL), either version 1.1 or (at -your option) any later version as soon as they are approved by the European -Commission. - -Alternatively, you may redistribute and/or modify this program under the -terms of the GNU Lesser General Public License as published by the Free -Software Foundation; either version 3 of the License, or (at your option) -any later version. - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - -See the licenses for the specific language governing permissions, limitations -and more details. - -You should have received a copy of the EUPL1.1 and the LGPLv3 licenses along -with this program; if not, you may find them at: - -https://joinup.ec.europa.eu/software/page/eupl/licence-eupl -http://www.gnu.org/licenses/ -and -https://www.gnu.org/licenses/lgpl.txt -**/ -package org.bimrocket.ihub.processors.spark; - -import java.io.IOException; -import java.util.Arrays; -import java.util.List; -import java.util.Properties; -import java.util.UUID; - -import org.apache.spark.SparkContext; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SparkSession; -import org.bimrocket.ihub.config.SparkConfig; -import org.bimrocket.ihub.util.BatchUpdateRunnable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ArrayNode; - -public class SparkBatchUpdateRunnable extends BatchUpdateRunnable -{ - private static final Logger log = LoggerFactory - .getLogger(SparkBatchUpdateRunnable.class); - - SparkSession session; - SparkContext context; - Properties sparkProps; - - public SparkBatchUpdateRunnable(ArrayNode all, Properties sparkProps) - { - super(all, Integer - .valueOf((String) sparkProps.get(SparkBatchUpdateProcessor.PROPS_UPDATEABLE_LENGTH))); - this.sparkProps = sparkProps; - this.session = SparkConfig.getSession(); - this.context = this.session.sparkContext(); - } - - @Override - protected void batchUpdate() - { - String json = null; - try - { - json = mapper.writeValueAsString(all); - } - catch (JsonProcessingException e) - { - log.error("While transforming all ArrayNode" - + " to json String Exception has occurred ", e); - } - if (json == null) - return; - List data = Arrays.asList(json); - - Dataset ds = session.createDataset(data, Encoders.STRING()); - - ds.write().mode((String) sparkProps.get(SparkBatchUpdateProcessor.PROPS_MODE)) - .format((String) sparkProps.get(SparkBatchUpdateProcessor.PROPS_FORMAT)) - .save((String) sparkProps.get(SparkBatchUpdateProcessor.PROPS_FILE_SYSTEM_PATH) - + (String) sparkProps.get(SparkBatchUpdateProcessor.PROPS_FILE_NAME)); - } -} \ No newline at end of file diff --git a/src/main/java/org/bimrocket/ihub/processors/spark/SparkBatchUpdateProcessor.java b/src/main/java/org/bimrocket/ihub/processors/spark/SparkWriter.java similarity index 58% rename from src/main/java/org/bimrocket/ihub/processors/spark/SparkBatchUpdateProcessor.java rename to src/main/java/org/bimrocket/ihub/processors/spark/SparkWriter.java index 46ac29c..e36ad7c 100644 --- a/src/main/java/org/bimrocket/ihub/processors/spark/SparkBatchUpdateProcessor.java +++ b/src/main/java/org/bimrocket/ihub/processors/spark/SparkWriter.java @@ -30,96 +30,140 @@ the terms of the European Public License (EUPL), either version 1.1 or (at **/ package org.bimrocket.ihub.processors.spark; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import org.apache.spark.SparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.bimrocket.ihub.config.SparkConfig; import org.bimrocket.ihub.connector.ProcessedObject; import org.bimrocket.ihub.processors.Sender; import org.bimrocket.ihub.util.ConfigProperty; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; -public class SparkBatchUpdateProcessor extends Sender +public class SparkWriter extends Sender { private static final Logger log = - LoggerFactory.getLogger(SparkBatchUpdateProcessor.class); + LoggerFactory.getLogger(SparkWriter.class); public static final String PROPS_FORMAT = "format"; public static final String PROPS_MODE = "mode"; - public static final String PROPS_FILE_SYSTEM_PATH = "fs.path"; - public static final String PROPS_FILE_NAME= "file.name"; - public static final String PROPS_UPDATEABLE_LENGTH = "updateable.length"; + public static final String PROPS_FILE_SYSTEM_PATH = "fsPath"; + public static final String PROPS_FILE_NAME= "fileName"; + public static final String PROPS_UPDATEABLE_LENGTH = "updateableLength"; - @ConfigProperty(name = SparkBatchUpdateProcessor.PROPS_FORMAT, + @ConfigProperty(name = SparkWriter.PROPS_FORMAT, description = "Spark Format to write in.") public String format = "parquet"; - @ConfigProperty(name = SparkBatchUpdateProcessor.PROPS_MODE, + @ConfigProperty(name = SparkWriter.PROPS_MODE, description = "Spark write mode to write in") public String mode = "append"; - @ConfigProperty(name = SparkBatchUpdateProcessor.PROPS_FILE_SYSTEM_PATH, + @ConfigProperty(name = SparkWriter.PROPS_FILE_SYSTEM_PATH, description = "Path to storage directory can be hdfs (hdfs://server/) or normal fs", required = false) public String fsPath = "C:\\"; - @ConfigProperty(name = SparkBatchUpdateProcessor.PROPS_FILE_NAME, + @ConfigProperty(name = SparkWriter.PROPS_FILE_NAME, description = "Name of file to write in") public String fileName; - @ConfigProperty(name = SparkBatchUpdateProcessor.PROPS_UPDATEABLE_LENGTH, + @ConfigProperty(name = SparkWriter.PROPS_UPDATEABLE_LENGTH, description = "Amount of records to perform update/save", required = false) public Integer updateAmount = 50; private ArrayNode all; - private ExecutorService executor; - private SparkBatchUpdateRunnable batchUpdater; + private SparkSession session; + private SparkContext context; + private ObjectMapper mapper; + private boolean save; @Override public void init() throws Exception { super.init(); - ObjectMapper mapper = new ObjectMapper(); + this.save = false; + this.mapper = new ObjectMapper(); this.all = mapper.createArrayNode(); - this.executor = Executors.newSingleThreadExecutor(); - this.batchUpdater = new SparkBatchUpdateRunnable(this.all, this.getProperties()); - executor.submit(this.batchUpdater); + this.session = SparkConfig.buildContext(); + this.context = this.session.sparkContext(); } @Override public boolean processObject(ProcessedObject procObject) { if (procObject.isIgnore()) { - this.batchUpdater.runSave(); + this.batchUpdate(); return false; } JsonNode toSend = procObject.getGlobalObject() != null ? procObject.getGlobalObject() : procObject.getLocalObject(); if (toSend == null) { - this.batchUpdater.runSave(); + this.batchUpdate(); return false; } all.add(toSend); - synchronized (this) { - executor.notifyAll(); - } + this.batchUpdate(); log.debug("adding {} json object to all elements array", toSend.toPrettyString()); return true; } - @Override - public void end() { - log.debug("ending SparkBatchUpdaterProcessor"); - super.end(); - executor.shutdown(); + private void runSave() { + this.save = true; + this.batchUpdate(); + this.cleanUp(); + } + + protected void batchUpdate() + { + if ((all.size() > 0 && save) || all.size() > updateAmount) { + String json = null; + try + { + json = mapper.writeValueAsString(all); + } + catch (JsonProcessingException e) + { + log.error("While transforming all ArrayNode" + + " to json String Exception has occurred ", e); + } + if (json == null) + return; + List data = Arrays.asList(json); + + Dataset ds = session.createDataset(data, Encoders.STRING()); + + ds.write().mode(mode) + .format(format) + .save(fsPath + + fileName); + } + + } + + private void cleanUp() { + this.save = false; + this.all.removeAll(); } + @Override + public void afterProcessing() { + this.runSave(); + } } \ No newline at end of file diff --git a/src/main/java/org/bimrocket/ihub/service/ConnectorMapperService.java b/src/main/java/org/bimrocket/ihub/service/ConnectorMapperService.java index 7937886..bbd8a85 100644 --- a/src/main/java/org/bimrocket/ihub/service/ConnectorMapperService.java +++ b/src/main/java/org/bimrocket/ihub/service/ConnectorMapperService.java @@ -241,7 +241,6 @@ public Map getProcessorProperties(Processor processor) String propertyName = propHandler.getName(); try { - processor.setProperty(propertyName, propHandler.getValue(processor).toString()); properties.put(propertyName, propHandler.getValue(processor)); } catch (Exception ex) @@ -279,6 +278,7 @@ public void setProcessorProperties(Processor processor, try { + processor.setProperty(propertyName, propertyValue.toString()); propHandler.setValue(processor, propertyValue); } catch (Exception ex) diff --git a/src/main/java/org/bimrocket/ihub/util/BatchUpdateRunnable.java b/src/main/java/org/bimrocket/ihub/util/BatchUpdateRunnable.java deleted file mode 100644 index f31e224..0000000 --- a/src/main/java/org/bimrocket/ihub/util/BatchUpdateRunnable.java +++ /dev/null @@ -1,111 +0,0 @@ -/** -BIMROCKET - -Copyright (C) 2022, Ajuntament de Sant Feliu de Llobregat - -This program is licensed and may be used, modified and redistributed under -the terms of the European Public License (EUPL), either version 1.1 or (at -your option) any later version as soon as they are approved by the European -Commission. - -Alternatively, you may redistribute and/or modify this program under the -terms of the GNU Lesser General Public License as published by the Free -Software Foundation; either version 3 of the License, or (at your option) -any later version. - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - -See the licenses for the specific language governing permissions, limitations -and more details. - -You should have received a copy of the EUPL1.1 and the LGPLv3 licenses along -with this program; if not, you may find them at: - -https://joinup.ec.europa.eu/software/page/eupl/licence-eupl -http://www.gnu.org/licenses/ -and -https://www.gnu.org/licenses/lgpl.txt -**/ -package org.bimrocket.ihub.util; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ArrayNode; - -/** - * - * @author kfiertek-nexus-geographics - * - */ -public abstract class BatchUpdateRunnable implements Runnable -{ - private static final Logger log = LoggerFactory - .getLogger(BatchUpdateRunnable.class); - - protected ArrayNode all; - protected int saveableSize; - protected boolean runCheck; - protected boolean saveCheck; - protected ObjectMapper mapper; - - protected BatchUpdateRunnable(ArrayNode all, int saveableSize) - { - this.all = all; - this.saveableSize = saveableSize; - this.runCheck = true; - this.saveCheck = false; - this.mapper = new ObjectMapper(); - } - - @Override - public void run() - { - while (runCheck) - { - if ((all.size() >= this.saveableSize) || saveCheck) - { - this.save(); - } - synchronized (this) - { - try - { - wait(); - } - catch (InterruptedException e) - { - log.error("Exception occurred while waiting ::", e); - } - - } - - } - log.debug("BatchWriterRunnable end"); - } - - public void runSave() { - this.saveCheck = true; - } - - protected void stop() { - this.runCheck = false; - } - - protected void save() - { - this.batchUpdate(); - this.cleanUp(); - } - - protected abstract void batchUpdate(); - - private void cleanUp() - { - this.all.removeAll(); - this.saveCheck = false; - } -} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index e533e0a..b6874d9 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1,5 +1,5 @@ #ihub config -ihub.spark.enabled=${IHUB.SPARK.ENABLED:#{"true"}} +ihub.spark.enabled=true # spark context configuration ihub.spark.url=${IHUB.SPARK.URL:#{"local[*]"}} From 4268f239dc7eaa40dbe94cdef0cd2266a2971a34 Mon Sep 17 00:00:00 2001 From: "NG\\kfiertek" Date: Thu, 24 Mar 2022 18:02:17 +0100 Subject: [PATCH 3/3] fix bug SparkWriter --- .../org/bimrocket/ihub/processors/spark/SparkWriter.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/bimrocket/ihub/processors/spark/SparkWriter.java b/src/main/java/org/bimrocket/ihub/processors/spark/SparkWriter.java index e36ad7c..ddc1b56 100644 --- a/src/main/java/org/bimrocket/ihub/processors/spark/SparkWriter.java +++ b/src/main/java/org/bimrocket/ihub/processors/spark/SparkWriter.java @@ -32,13 +32,11 @@ the terms of the European Public License (EUPL), either version 1.1 or (at import java.util.Arrays; import java.util.List; -import java.util.Properties; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import org.apache.spark.SparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.bimrocket.ihub.config.SparkConfig; import org.bimrocket.ihub.connector.ProcessedObject; @@ -148,8 +146,9 @@ protected void batchUpdate() List data = Arrays.asList(json); Dataset ds = session.createDataset(data, Encoders.STRING()); - - ds.write().mode(mode) + Dataset rows = session.read().json(ds); + + rows.write().mode(mode) .format(format) .save(fsPath + fileName);