diff --git a/pom.xml b/pom.xml index 2ac9d89..b0a1f30 100644 --- a/pom.xml +++ b/pom.xml @@ -49,6 +49,48 @@ + + + org.codehaus.janino + commons-compiler + 3.0.8 + + + org.codehaus.janino + janino + 3.0.8 + + + org.apache.spark + spark-core_2.13 + 3.2.0 + + + org.slf4j + slf4j-log4j12 + + + org.codehaus.janino + commons-compiler + + + + + org.apache.spark + spark-sql_2.13 + 3.2.0 + + + org.slf4j + slf4j-log4j12 + + + org.codehaus.janino + commons-compiler + + + + org.springframework.boot diff --git a/src/main/java/org/bimrocket/ihub/config/SparkConfig.java b/src/main/java/org/bimrocket/ihub/config/SparkConfig.java new file mode 100644 index 0000000..5d06d6e --- /dev/null +++ b/src/main/java/org/bimrocket/ihub/config/SparkConfig.java @@ -0,0 +1,131 @@ +/** +BIMROCKET + +Copyright (C) 2022, Ajuntament de Sant Feliu de Llobregat + +This program is licensed and may be used, modified and redistributed under +the terms of the European Public License (EUPL), either version 1.1 or (at +your option) any later version as soon as they are approved by the European +Commission. + +Alternatively, you may redistribute and/or modify this program under the +terms of the GNU Lesser General Public License as published by the Free +Software Foundation; either version 3 of the License, or (at your option) +any later version. + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + +See the licenses for the specific language governing permissions, limitations +and more details. + +You should have received a copy of the EUPL1.1 and the LGPLv3 licenses along +with this program; if not, you may find them at: + +https://joinup.ec.europa.eu/software/page/eupl/licence-eupl +http://www.gnu.org/licenses/ +and +https://www.gnu.org/licenses/lgpl.txt +**/ +package org.bimrocket.ihub.config; + +import org.apache.spark.sql.SparkSession; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class SparkConfig +{ + private static SparkSession session; + + private static boolean enabled; + private static String sparkUrl; + private static String sparkTimezone; + private static String sparkDriverHost; + private static String sparkDriverPort; + private static String sparkBlockManagerPort; + private static String sparkDriverMemory; + private static String sparkDeployMode; + private static String sparkExecutorCores; + private static String sparkExecutorMemory; + + public static SparkSession buildContext() throws Exception { + if (SparkConfig.session != null) { + return SparkConfig.getSession(); + } + if (!SparkConfig.enabled) { + throw new Exception("Spark not enabled"); + } + SparkConfig.session = SparkSession.builder() + .master(sparkUrl) + .config("spark.sql.session.timezone",sparkTimezone) + .config("spark.blockManager.port", sparkBlockManagerPort) + .config("spark.driver.port", sparkDriverPort) + .config("spark.driver.host", sparkDriverHost) + .config("spark.driver.memory", sparkDriverMemory) + .config("spark.driver.bindAddress", "0.0.0.0") + .config("spark.submit.deployMode", sparkDeployMode) + .config("spark.executor.core",sparkExecutorCores) + .config("spark.executor.memory", sparkExecutorMemory) + .getOrCreate(); + return SparkConfig.getSession(); + } + + public static SparkSession getSession() { + return SparkConfig.session; + } + + @Value("${ihub.spark.enabled}") + public void setEnabled(String enabled) { + SparkConfig.enabled = Boolean.valueOf(enabled); + }; + + @Value("${ihub.spark.url}") + public void setUrl(String url) { + SparkConfig.sparkUrl = url; + }; + + @Value("${ihub.spark.timezone}") + public void setTimezone(String sparkTimezone) { + SparkConfig.sparkTimezone = sparkTimezone; + }; + + @Value("${ihub.spark.driver.host}") + public void setDriverHost(String sparkDriverHost) { + SparkConfig.sparkDriverHost = sparkDriverHost; + }; + + @Value("${ihub.spark.driver.port}") + public void setDriverPort(String sparkDriverPort) { + SparkConfig.sparkDriverPort = sparkDriverPort; + }; + + @Value("${ihub.spark.blockmanager.port}") + public void setBlockmanagerPort(String sparkBlockManagerPort) { + SparkConfig.sparkBlockManagerPort = sparkBlockManagerPort; + }; + + @Value("${ihub.spark.driver.memory}") + public void setDriverMemory(String sparkDriverMemory) { + SparkConfig.sparkDriverMemory = sparkDriverMemory; + }; + + @Value("${ihub.spark.deploy.mode}") + public void setDeployMode(String sparkDeployMode) { + SparkConfig.sparkDeployMode = sparkDeployMode; + }; + + @Value("${ihub.spark.executor.cores}") + public void setExecutorCores(String sparkExecutorCores) { + SparkConfig.sparkExecutorCores = sparkExecutorCores; + }; + + @Value("${ihub.spark.executor.memory}") + public void setExecutorMemory(String sparkExecutorMemory) { + SparkConfig.sparkExecutorMemory = sparkExecutorMemory; + }; + + +} diff --git a/src/main/java/org/bimrocket/ihub/connector/Connector.java b/src/main/java/org/bimrocket/ihub/connector/Connector.java index c68ffb7..dc52af2 100644 --- a/src/main/java/org/bimrocket/ihub/connector/Connector.java +++ b/src/main/java/org/bimrocket/ihub/connector/Connector.java @@ -53,8 +53,7 @@ */ public class Connector implements Runnable { - private static final Logger log = - LoggerFactory.getLogger(Connector.class); + private static final Logger log = LoggerFactory.getLogger(Connector.class); public static final String RUNNING_STATUS = "RUNNING"; public static final String STOPPED_STATUS = "STOPPED"; @@ -264,25 +263,14 @@ public String toString() { StringBuilder buffer = new StringBuilder(); - return buffer.append("Connector") - .append("{ name: \"") - .append(name) - .append("\", description: \"") - .append(description == null ? "" : description) - .append("\", inventory: \"") - .append(inventory) - .append("\", status: ") - .append(getStatus()) - .append(", autoStart: ") - .append(autoStart) - .append(", singleRun: ") - .append(singleRun) - .append(", debug: ") - .append(debugEnabled) - .append(", processors: ") - .append(processors) - .append(" }") - .toString(); + return buffer.append("Connector").append("{ name: \"").append(name) + .append("\", description: \"") + .append(description == null ? "" : description) + .append("\", inventory: \"").append(inventory).append("\", status: ") + .append(getStatus()).append(", autoStart: ").append(autoStart) + .append(", singleRun: ").append(singleRun).append(", debug: ") + .append(debugEnabled).append(", processors: ").append(processors) + .append(" }").toString(); } @Override @@ -301,8 +289,8 @@ public void run() { if (!initProcessors(runningProcessors)) throw new ProcessorInitException(427, - "Failed to initialize processor %s: %s", name, - lastError.getMessage()); + "Failed to initialize processor %s: %s", name, + lastError.getMessage()); while (!end) { @@ -313,14 +301,13 @@ public void run() { if (processor.isEnabled()) { - log.debug("Executing processor {}", - processor.getClass().getName()); - + log.debug("Executing processor {}", processor.getClass().getName()); if (processor.processObject(procObject)) { processorCount++; } - else break; + else + break; } } @@ -328,14 +315,20 @@ public void run() { updateIdPairRepository(procObject); updateStatistics(procObject); - log.debug("Object processed, type: {}, operation: {}, " - + "localId: {}, globalId: {}", - procObject.getObjectType(), procObject.getOperation(), - procObject.getLocalId(), procObject.getGlobalId()); + log.debug( + "Object processed, type: {}, operation: {}, " + + "localId: {}, globalId: {}", + procObject.getObjectType(), procObject.getOperation(), + procObject.getLocalId(), procObject.getGlobalId()); } if (processorCount == 0) { + for (var processor : runningProcessors) + { + if (processor.isEnabled()) + processor.afterProcessing(); + } if (singleRun) { end = true; @@ -356,12 +349,13 @@ public void run() catch (ProcessorInitException ex) { lastError = ex; - log.error(ex.getMessage()); + log.error("ProcessorInitException message :: {}, stacktrace ::", + ex.getMessage(), ex); } catch (Exception ex) { lastError = ex; - log.error("An error has ocurred: {}", ex.toString()); + log.error("An error has ocurred: {}", ex.toString(), ex); } finally { @@ -405,7 +399,7 @@ public Connector save() public ConnectorSetup saveSetup() { ConnectorSetup connSetup = service.getConnectorMapperService() - .getConnectorSetup(this); + .getConnectorSetup(this); service.getConnectorSetupRepository().save(connSetup); @@ -428,12 +422,12 @@ public Connector restore() throws Exception public ConnectorSetup restoreSetup() throws Exception { Optional optConnSetup = service - .getConnectorSetupRepository().findById(name); + .getConnectorSetupRepository().findById(name); if (optConnSetup.isPresent()) { ConnectorSetup connSetup = optConnSetup.get(); service.getConnectorMapperService().setConnectorSetup(this, connSetup, - true); + true); unsaved = false; lastError = null; @@ -451,14 +445,14 @@ protected boolean initProcessors(List processors) { processor.init(); log.debug("Processor #{}: {} initialized.", initialized, - processor.getClass().getName()); + processor.getClass().getName()); initialized++; } catch (Exception ex) { log.debug("Failed to initialize processor #{}: {}: {}", initialized, - processor.getClass().getName(), ex.toString()); + processor.getClass().getName(), ex.toString(), ex); lastError = ex; break; } @@ -485,16 +479,17 @@ public void endProcessors(List processors) { processor.end(); log.debug("Processor #{}: {} ended.", ended, - processor.getClass().getName()); + processor.getClass().getName()); ended++; } catch (Exception ex) { log.debug("Failed to end processor #{}: {}: {}", ended, - processor.getClass().getName(), ex.toString()); + processor.getClass().getName(), ex.toString()); - if (lastError == null) lastError = ex; + if (lastError == null) + lastError = ex; } } } @@ -503,9 +498,9 @@ void updateIdPairRepository(ProcessedObject procObject) { IdPairRepository idPairRepository = service.getIdPairRepository(); - Optional result = idPairRepository. - findByInventoryAndObjectTypeAndLocalId(inventory, - procObject.getObjectType(), procObject.getLocalId()); + Optional result = idPairRepository + .findByInventoryAndObjectTypeAndLocalId(inventory, + procObject.getObjectType(), procObject.getLocalId()); if (procObject.isDelete()) { @@ -552,17 +547,17 @@ void updateStatistics(ProcessedObject procObject) processed++; switch (procObject.getOperation()) { - case INSERT: - inserted++; - break; - case UPDATE: - updated++; - break; - case DELETE: - deleted++; - break; - default: - ignored++; + case INSERT: + inserted++; + break; + case UPDATE: + updated++; + break; + case DELETE: + deleted++; + break; + default: + ignored++; } } } diff --git a/src/main/java/org/bimrocket/ihub/connector/Processor.java b/src/main/java/org/bimrocket/ihub/connector/Processor.java index c8bc713..35e4ad3 100644 --- a/src/main/java/org/bimrocket/ihub/connector/Processor.java +++ b/src/main/java/org/bimrocket/ihub/connector/Processor.java @@ -31,6 +31,8 @@ package org.bimrocket.ihub.connector; +import java.util.Properties; + /** * * @author realor @@ -40,6 +42,7 @@ public abstract class Processor private Connector connector; protected String description; protected boolean enabled; + protected Properties props = new Properties(); public Processor() { @@ -80,6 +83,14 @@ public void setEnabled(boolean enabled) { this.enabled = enabled; } + + public Properties getProperties() { + return this.props; + } + + public void setProperty(String key, String value) { + this.props.setProperty(key, value); + } /** @@ -110,6 +121,10 @@ public void init() throws Exception public void end() { } + + public void afterProcessing() { + + } @Override public String toString() diff --git a/src/main/java/org/bimrocket/ihub/processors/kafka/JsonKafkaSender.java b/src/main/java/org/bimrocket/ihub/processors/kafka/JsonKafkaSender.java index ed13e22..b103d80 100644 --- a/src/main/java/org/bimrocket/ihub/processors/kafka/JsonKafkaSender.java +++ b/src/main/java/org/bimrocket/ihub/processors/kafka/JsonKafkaSender.java @@ -47,7 +47,7 @@ public class JsonKafkaSender extends KafkaSender @Override public boolean processObject(ProcessedObject procObject) { - JsonNode toSend = procObject.getGlobalObject(); + JsonNode toSend = procObject.getGlobalObject() != null ? procObject.getGlobalObject() : procObject.getLocalObject(); if (toSend == null) { return false; diff --git a/src/main/java/org/bimrocket/ihub/processors/kafka/KafkaLoader.java b/src/main/java/org/bimrocket/ihub/processors/kafka/KafkaLoader.java index 2c17c1d..e049403 100644 --- a/src/main/java/org/bimrocket/ihub/processors/kafka/KafkaLoader.java +++ b/src/main/java/org/bimrocket/ihub/processors/kafka/KafkaLoader.java @@ -50,7 +50,7 @@ public abstract class KafkaLoader extends Loader description = "Kafka bootstrap servers address") public String bootstrapAddress; - @ConfigProperty(name = "groudId", + @ConfigProperty(name = "groupId", description = "Number of the kafka group to which the loader belongs.") public String groupId; diff --git a/src/main/java/org/bimrocket/ihub/processors/spark/SparkWriter.java b/src/main/java/org/bimrocket/ihub/processors/spark/SparkWriter.java new file mode 100644 index 0000000..ddc1b56 --- /dev/null +++ b/src/main/java/org/bimrocket/ihub/processors/spark/SparkWriter.java @@ -0,0 +1,168 @@ +/** +BIMROCKET + +Copyright (C) 2022, Ajuntament de Sant Feliu de Llobregat + +This program is licensed and may be used, modified and redistributed under +the terms of the European Public License (EUPL), either version 1.1 or (at +your option) any later version as soon as they are approved by the European +Commission. + +Alternatively, you may redistribute and/or modify this program under the +terms of the GNU Lesser General Public License as published by the Free +Software Foundation; either version 3 of the License, or (at your option) +any later version. + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + +See the licenses for the specific language governing permissions, limitations +and more details. + +You should have received a copy of the EUPL1.1 and the LGPLv3 licenses along +with this program; if not, you may find them at: + +https://joinup.ec.europa.eu/software/page/eupl/licence-eupl +http://www.gnu.org/licenses/ +and +https://www.gnu.org/licenses/lgpl.txt +**/ +package org.bimrocket.ihub.processors.spark; + +import java.util.Arrays; +import java.util.List; + +import org.apache.spark.SparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.bimrocket.ihub.config.SparkConfig; +import org.bimrocket.ihub.connector.ProcessedObject; +import org.bimrocket.ihub.processors.Sender; +import org.bimrocket.ihub.util.ConfigProperty; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; + +public class SparkWriter extends Sender +{ + private static final Logger log = + LoggerFactory.getLogger(SparkWriter.class); + + public static final String PROPS_FORMAT = "format"; + public static final String PROPS_MODE = "mode"; + public static final String PROPS_FILE_SYSTEM_PATH = "fsPath"; + public static final String PROPS_FILE_NAME= "fileName"; + public static final String PROPS_UPDATEABLE_LENGTH = "updateableLength"; + + + @ConfigProperty(name = SparkWriter.PROPS_FORMAT, + description = "Spark Format to write in.") + public String format = "parquet"; + + @ConfigProperty(name = SparkWriter.PROPS_MODE, + description = "Spark write mode to write in") + public String mode = "append"; + + @ConfigProperty(name = SparkWriter.PROPS_FILE_SYSTEM_PATH, + description = "Path to storage directory can be hdfs (hdfs://server/) or normal fs", required = false) + public String fsPath = "C:\\"; + + @ConfigProperty(name = SparkWriter.PROPS_FILE_NAME, + description = "Name of file to write in") + public String fileName; + + @ConfigProperty(name = SparkWriter.PROPS_UPDATEABLE_LENGTH, + description = "Amount of records to perform update/save", required = false) + public Integer updateAmount = 50; + + private ArrayNode all; + private SparkSession session; + private SparkContext context; + private ObjectMapper mapper; + private boolean save; + + + @Override + public void init() throws Exception + { + super.init(); + this.save = false; + this.mapper = new ObjectMapper(); + this.all = mapper.createArrayNode(); + this.session = SparkConfig.buildContext(); + this.context = this.session.sparkContext(); + } + + @Override + public boolean processObject(ProcessedObject procObject) + { + if (procObject.isIgnore()) { + this.batchUpdate(); + return false; + } + + JsonNode toSend = procObject.getGlobalObject() != null ? procObject.getGlobalObject() : procObject.getLocalObject(); + if (toSend == null) + { + this.batchUpdate(); + return false; + } + + all.add(toSend); + this.batchUpdate(); + log.debug("adding {} json object to all elements array", toSend.toPrettyString()); + + return true; + } + + private void runSave() { + this.save = true; + this.batchUpdate(); + this.cleanUp(); + } + + protected void batchUpdate() + { + if ((all.size() > 0 && save) || all.size() > updateAmount) { + String json = null; + try + { + json = mapper.writeValueAsString(all); + } + catch (JsonProcessingException e) + { + log.error("While transforming all ArrayNode" + + " to json String Exception has occurred ", e); + } + if (json == null) + return; + List data = Arrays.asList(json); + + Dataset ds = session.createDataset(data, Encoders.STRING()); + Dataset rows = session.read().json(ds); + + rows.write().mode(mode) + .format(format) + .save(fsPath + + fileName); + } + + } + + private void cleanUp() { + this.save = false; + this.all.removeAll(); + } + + @Override + public void afterProcessing() { + this.runSave(); + } +} \ No newline at end of file diff --git a/src/main/java/org/bimrocket/ihub/service/ConnectorMapperService.java b/src/main/java/org/bimrocket/ihub/service/ConnectorMapperService.java index a64fc23..bbd8a85 100644 --- a/src/main/java/org/bimrocket/ihub/service/ConnectorMapperService.java +++ b/src/main/java/org/bimrocket/ihub/service/ConnectorMapperService.java @@ -278,6 +278,7 @@ public void setProcessorProperties(Processor processor, try { + processor.setProperty(propertyName, propertyValue.toString()); propHandler.setValue(processor, propertyValue); } catch (Exception ex) diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 56f31d2..b6874d9 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1,3 +1,20 @@ +#ihub config +ihub.spark.enabled=true + +# spark context configuration +ihub.spark.url=${IHUB.SPARK.URL:#{"local[*]"}} +ihub.spark.timezone=${IHUB.SPARK.TIMEZONE:#{"UTC"}} +ihub.spark.driver.host=${IHUB.SPARK.DRIVER.HOST:#{"127.0.0.1"}} +ihub.spark.driver.port=${IHUB.SPARK.DRIVER.PORT:#{"10027"}} +ihub.spark.driver.memory=${IHUB.SPARK.DRIVER.MEMORY:#{"4G"}} +ihub.spark.blockmanager.port=${IHUB.SPARK.BLOCKMANAGER.PORT:#{"10028"}} +ihub.spark.deploy.mode=${IHUB.SPARK.DEPLOY.MODE:#{"cluster"}} + +#spark executor configuration +ihub.spark.executor.cores=${IHUB.SPARK.EXECUTOR.CORES:#{"2"}} +ihub.spark.executor.memory=${IHUB.SPARK.EXECUTOR.MEMORY:#{"2G"}} + + # security spring.security.user.name=admin spring.security.user.password=admin