Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,48 @@
</repositories>

<dependencies>
<!-- SPARK -->
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>commons-compiler</artifactId>
<version>3.0.8</version>
</dependency>
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
<version>3.0.8</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.13</artifactId>
<version>3.2.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.janino</groupId>
<artifactId>commons-compiler</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.13</artifactId>
<version>3.2.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.janino</groupId>
<artifactId>commons-compiler</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- SPRING -->
<dependency>
<groupId>org.springframework.boot</groupId>
Expand Down
131 changes: 131 additions & 0 deletions src/main/java/org/bimrocket/ihub/config/SparkConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/**
BIMROCKET

Copyright (C) 2022, Ajuntament de Sant Feliu de Llobregat

This program is licensed and may be used, modified and redistributed under
the terms of the European Public License (EUPL), either version 1.1 or (at
your option) any later version as soon as they are approved by the European
Commission.

Alternatively, you may redistribute and/or modify this program under the
terms of the GNU Lesser General Public License as published by the Free
Software Foundation; either version 3 of the License, or (at your option)
any later version.

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

See the licenses for the specific language governing permissions, limitations
and more details.

You should have received a copy of the EUPL1.1 and the LGPLv3 licenses along
with this program; if not, you may find them at:

https://joinup.ec.europa.eu/software/page/eupl/licence-eupl
http://www.gnu.org/licenses/
and
https://www.gnu.org/licenses/lgpl.txt
**/
package org.bimrocket.ihub.config;

import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.annotation.Configuration;

@Configuration
public class SparkConfig
{
private static SparkSession session;

private static boolean enabled;
private static String sparkUrl;
private static String sparkTimezone;
private static String sparkDriverHost;
private static String sparkDriverPort;
private static String sparkBlockManagerPort;
private static String sparkDriverMemory;
private static String sparkDeployMode;
private static String sparkExecutorCores;
private static String sparkExecutorMemory;

public static SparkSession buildContext() throws Exception {
if (SparkConfig.session != null) {
return SparkConfig.getSession();
}
if (!SparkConfig.enabled) {
throw new Exception("Spark not enabled");
}
SparkConfig.session = SparkSession.builder()
.master(sparkUrl)
.config("spark.sql.session.timezone",sparkTimezone)
.config("spark.blockManager.port", sparkBlockManagerPort)
.config("spark.driver.port", sparkDriverPort)
.config("spark.driver.host", sparkDriverHost)
.config("spark.driver.memory", sparkDriverMemory)
.config("spark.driver.bindAddress", "0.0.0.0")
.config("spark.submit.deployMode", sparkDeployMode)
.config("spark.executor.core",sparkExecutorCores)
.config("spark.executor.memory", sparkExecutorMemory)
.getOrCreate();
return SparkConfig.getSession();
}

public static SparkSession getSession() {
return SparkConfig.session;
}

@Value("${ihub.spark.enabled}")
public void setEnabled(String enabled) {
SparkConfig.enabled = Boolean.valueOf(enabled);
};

@Value("${ihub.spark.url}")
public void setUrl(String url) {
SparkConfig.sparkUrl = url;
};

@Value("${ihub.spark.timezone}")
public void setTimezone(String sparkTimezone) {
SparkConfig.sparkTimezone = sparkTimezone;
};

@Value("${ihub.spark.driver.host}")
public void setDriverHost(String sparkDriverHost) {
SparkConfig.sparkDriverHost = sparkDriverHost;
};

@Value("${ihub.spark.driver.port}")
public void setDriverPort(String sparkDriverPort) {
SparkConfig.sparkDriverPort = sparkDriverPort;
};

@Value("${ihub.spark.blockmanager.port}")
public void setBlockmanagerPort(String sparkBlockManagerPort) {
SparkConfig.sparkBlockManagerPort = sparkBlockManagerPort;
};

@Value("${ihub.spark.driver.memory}")
public void setDriverMemory(String sparkDriverMemory) {
SparkConfig.sparkDriverMemory = sparkDriverMemory;
};

@Value("${ihub.spark.deploy.mode}")
public void setDeployMode(String sparkDeployMode) {
SparkConfig.sparkDeployMode = sparkDeployMode;
};

@Value("${ihub.spark.executor.cores}")
public void setExecutorCores(String sparkExecutorCores) {
SparkConfig.sparkExecutorCores = sparkExecutorCores;
};

@Value("${ihub.spark.executor.memory}")
public void setExecutorMemory(String sparkExecutorMemory) {
SparkConfig.sparkExecutorMemory = sparkExecutorMemory;
};


}
105 changes: 50 additions & 55 deletions src/main/java/org/bimrocket/ihub/connector/Connector.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@
*/
public class Connector implements Runnable
{
private static final Logger log =
LoggerFactory.getLogger(Connector.class);
private static final Logger log = LoggerFactory.getLogger(Connector.class);

public static final String RUNNING_STATUS = "RUNNING";
public static final String STOPPED_STATUS = "STOPPED";
Expand Down Expand Up @@ -264,25 +263,14 @@ public String toString()
{
StringBuilder buffer = new StringBuilder();

return buffer.append("Connector")
.append("{ name: \"")
.append(name)
.append("\", description: \"")
.append(description == null ? "" : description)
.append("\", inventory: \"")
.append(inventory)
.append("\", status: ")
.append(getStatus())
.append(", autoStart: ")
.append(autoStart)
.append(", singleRun: ")
.append(singleRun)
.append(", debug: ")
.append(debugEnabled)
.append(", processors: ")
.append(processors)
.append(" }")
.toString();
return buffer.append("Connector").append("{ name: \"").append(name)
.append("\", description: \"")
.append(description == null ? "" : description)
.append("\", inventory: \"").append(inventory).append("\", status: ")
.append(getStatus()).append(", autoStart: ").append(autoStart)
.append(", singleRun: ").append(singleRun).append(", debug: ")
.append(debugEnabled).append(", processors: ").append(processors)
.append(" }").toString();
}

@Override
Expand All @@ -301,8 +289,8 @@ public void run()
{
if (!initProcessors(runningProcessors))
throw new ProcessorInitException(427,
"Failed to initialize processor %s: %s", name,
lastError.getMessage());
"Failed to initialize processor %s: %s", name,
lastError.getMessage());

while (!end)
{
Expand All @@ -313,29 +301,34 @@ public void run()
{
if (processor.isEnabled())
{
log.debug("Executing processor {}",
processor.getClass().getName());

log.debug("Executing processor {}", processor.getClass().getName());
if (processor.processObject(procObject))
{
processorCount++;
}
else break;
else
break;
}
}

if (!procObject.isIgnore())
{
updateIdPairRepository(procObject);
updateStatistics(procObject);
log.debug("Object processed, type: {}, operation: {}, "
+ "localId: {}, globalId: {}",
procObject.getObjectType(), procObject.getOperation(),
procObject.getLocalId(), procObject.getGlobalId());
log.debug(
"Object processed, type: {}, operation: {}, "
+ "localId: {}, globalId: {}",
procObject.getObjectType(), procObject.getOperation(),
procObject.getLocalId(), procObject.getGlobalId());
}

if (processorCount == 0)
{
for (var processor : runningProcessors)
{
if (processor.isEnabled())
processor.afterProcessing();
}
if (singleRun)
{
end = true;
Expand All @@ -356,12 +349,13 @@ public void run()
catch (ProcessorInitException ex)
{
lastError = ex;
log.error(ex.getMessage());
log.error("ProcessorInitException message :: {}, stacktrace ::",
ex.getMessage(), ex);
}
catch (Exception ex)
{
lastError = ex;
log.error("An error has ocurred: {}", ex.toString());
log.error("An error has ocurred: {}", ex.toString(), ex);
}
finally
{
Expand Down Expand Up @@ -405,7 +399,7 @@ public Connector save()
public ConnectorSetup saveSetup()
{
ConnectorSetup connSetup = service.getConnectorMapperService()
.getConnectorSetup(this);
.getConnectorSetup(this);

service.getConnectorSetupRepository().save(connSetup);

Expand All @@ -428,12 +422,12 @@ public Connector restore() throws Exception
public ConnectorSetup restoreSetup() throws Exception
{
Optional<ConnectorSetup> optConnSetup = service
.getConnectorSetupRepository().findById(name);
.getConnectorSetupRepository().findById(name);
if (optConnSetup.isPresent())
{
ConnectorSetup connSetup = optConnSetup.get();
service.getConnectorMapperService().setConnectorSetup(this, connSetup,
true);
true);

unsaved = false;
lastError = null;
Expand All @@ -451,14 +445,14 @@ protected boolean initProcessors(List<Processor> processors)
{
processor.init();
log.debug("Processor #{}: {} initialized.", initialized,
processor.getClass().getName());
processor.getClass().getName());

initialized++;
}
catch (Exception ex)
{
log.debug("Failed to initialize processor #{}: {}: {}", initialized,
processor.getClass().getName(), ex.toString());
processor.getClass().getName(), ex.toString(), ex);
lastError = ex;
break;
}
Expand All @@ -485,16 +479,17 @@ public void endProcessors(List<Processor> processors)
{
processor.end();
log.debug("Processor #{}: {} ended.", ended,
processor.getClass().getName());
processor.getClass().getName());

ended++;
}
catch (Exception ex)
{
log.debug("Failed to end processor #{}: {}: {}", ended,
processor.getClass().getName(), ex.toString());
processor.getClass().getName(), ex.toString());

if (lastError == null) lastError = ex;
if (lastError == null)
lastError = ex;
}
}
}
Expand All @@ -503,9 +498,9 @@ void updateIdPairRepository(ProcessedObject procObject)
{
IdPairRepository idPairRepository = service.getIdPairRepository();

Optional<IdPair> result = idPairRepository.
findByInventoryAndObjectTypeAndLocalId(inventory,
procObject.getObjectType(), procObject.getLocalId());
Optional<IdPair> result = idPairRepository
.findByInventoryAndObjectTypeAndLocalId(inventory,
procObject.getObjectType(), procObject.getLocalId());

if (procObject.isDelete())
{
Expand Down Expand Up @@ -552,17 +547,17 @@ void updateStatistics(ProcessedObject procObject)
processed++;
switch (procObject.getOperation())
{
case INSERT:
inserted++;
break;
case UPDATE:
updated++;
break;
case DELETE:
deleted++;
break;
default:
ignored++;
case INSERT:
inserted++;
break;
case UPDATE:
updated++;
break;
case DELETE:
deleted++;
break;
default:
ignored++;
}
}
}
Loading