Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,12 @@ object HadoopUtils extends Logger {

private[this] var tgt: KerberosTicket = _

private[this] val hadoopLock = new Object()

private[this] lazy val configurationCache: util.Map[String, Configuration] =
new ConcurrentHashMap[String, Configuration]()

def getUgi(): UserGroupInformation = {
def getUgi(): UserGroupInformation = hadoopLock.synchronized {
if (ugi == null) {
ugi = if (HadoopConfigUtils.kerberosEnable) {
getKerberosUGI()
Expand Down Expand Up @@ -128,25 +130,27 @@ object HadoopUtils extends Logger {
* Automatically goes to $HADOOP_HOME/etc/hadoop to load the configuration.<br> We recommend the
* second method, without copying the configuration files.<br> </pre>
*/
def hadoopConf: Configuration = Option(reusableConf).getOrElse {
reusableConf = getConfigurationFromHadoopConfDir(hadoopConfDir)
// add hadoopConfDir to classpath...you know why???
ClassLoaderUtils.loadResource(hadoopConfDir)

if (StringUtils.isBlank(reusableConf.get("hadoop.tmp.dir"))) {
reusableConf.set("hadoop.tmp.dir", "/tmp")
}
if (StringUtils.isBlank(reusableConf.get("hbase.fs.tmp.dir"))) {
reusableConf.set("hbase.fs.tmp.dir", "/tmp")
def hadoopConf: Configuration = hadoopLock.synchronized {
Option(reusableConf).getOrElse {
reusableConf = getConfigurationFromHadoopConfDir(hadoopConfDir)
// add hadoopConfDir to classpath...you know why???
ClassLoaderUtils.loadResource(hadoopConfDir)

if (StringUtils.isBlank(reusableConf.get("hadoop.tmp.dir"))) {
reusableConf.set("hadoop.tmp.dir", "/tmp")
}
if (StringUtils.isBlank(reusableConf.get("hbase.fs.tmp.dir"))) {
reusableConf.set("hbase.fs.tmp.dir", "/tmp")
}
// disable timeline service as we only query yarn app here.
// Otherwise we may hit this kind of ERROR:
// java.lang.ClassNotFoundException: com.sun.jersey.api.client.config.ClientConfig
reusableConf.set("yarn.timeline-service.enabled", "false")
reusableConf.set("fs.hdfs.impl", classOf[DistributedFileSystem].getName)
reusableConf.set("fs.file.impl", classOf[LocalFileSystem].getName)
reusableConf.set("fs.hdfs.impl.disable.cache", "true")
reusableConf
}
// disable timeline service as we only query yarn app here.
// Otherwise we may hit this kind of ERROR:
// java.lang.ClassNotFoundException: com.sun.jersey.api.client.config.ClientConfig
reusableConf.set("yarn.timeline-service.enabled", "false")
reusableConf.set("fs.hdfs.impl", classOf[DistributedFileSystem].getName)
reusableConf.set("fs.file.impl", classOf[LocalFileSystem].getName)
reusableConf.set("fs.hdfs.impl.disable.cache", "true")
reusableConf
}

private[this] def closeHadoop(): Unit = {
Expand Down Expand Up @@ -199,7 +203,7 @@ object HadoopUtils extends Logger {
}
}

def hdfs: FileSystem = {
def hdfs: FileSystem = hadoopLock.synchronized {
Option(reusableHdfs).getOrElse {
reusableHdfs = Try {
getUgi().doAs[FileSystem](new PrivilegedAction[FileSystem]() {
Expand All @@ -214,7 +218,7 @@ object HadoopUtils extends Logger {
val timer = new Timer()
timer.schedule(
new TimerTask {
override def run(): Unit = {
override def run(): Unit = hadoopLock.synchronized {
closeHadoop()
logInfo(
s"Check Kerberos Tgt And reLogin From Keytab Finish:refresh time: ${DateUtils.format()}")
Expand All @@ -231,7 +235,7 @@ object HadoopUtils extends Logger {
}
}

def yarnClient: YarnClient = {
def yarnClient: YarnClient = hadoopLock.synchronized {
if (reusableYarnClient == null || !reusableYarnClient.isInState(STATE.STARTED)) {
reusableYarnClient = Try {
getUgi().doAs(new PrivilegedAction[YarnClient]() {
Expand Down
Loading