Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.streampark.common.conf

import org.apache.streampark.common.util.{CommandUtils, Logger}
import org.apache.streampark.common.util.{CommandUtils, FlinkEnvUtils, Logger}
import org.apache.streampark.common.util.Implicits._

import java.io.File
Expand All @@ -34,24 +34,15 @@ class FlinkVersion(val flinkHome: String) extends Serializable with Logger {

private[this] lazy val FLINK_VERSION_PATTERN = Pattern.compile("^Version: (.*), Commit ID: (.*)$")

private[this] lazy val FLINK_SCALA_VERSION_PATTERN =
Pattern.compile("^flink-dist_(\\d\\.\\d+).*.jar$")

private[this] lazy val APACHE_FLINK_VERSION_PATTERN = Pattern.compile("(^\\d+\\.\\d+\\.\\d+)")

private[this] lazy val OTHER_FLINK_VERSION_PATTERN = Pattern.compile("(\\d+\\.\\d+)(-*)")

lazy val scalaVersion: String = {
val matcher = FLINK_SCALA_VERSION_PATTERN.matcher(flinkDistJar.getName)
if (matcher.matches()) {
matcher.group(1)
} else {
// flink 1.15 + on support scala 2.12
"2.12"
}
}
private[this] lazy val FLINK_DIST_UNDERSCORE_PATTERN =
Pattern.compile("^flink-dist_(\\d+\\.\\d+)-(\\d+\\.\\d+(?:\\.\\d+)?(?:-SNAPSHOT)?)\\.jar$")

lazy val fullVersion: String = s"${version}_$scalaVersion"
private[this] lazy val FLINK_DIST_DASH_PATTERN =
Pattern.compile("^flink-dist-(\\d+\\.\\d+(?:\\.\\d+)?(?:-SNAPSHOT)?)\\.jar$")

lazy val flinkLib: File = {
require(flinkHome != null, "[StreamPark] flinkHome must not be null.")
Expand All @@ -63,44 +54,40 @@ class FlinkVersion(val flinkHome: String) extends Serializable with Logger {
lib
}

lazy val flinkLibs: List[URL] = flinkLib.listFiles().map(_.toURI.toURL).toList
lazy val flinkDistJar: File = {
val distJar = flinkLib.listFiles().filter(_.getName.matches("flink-dist.*\\.jar"))
distJar match {
case x if x.isEmpty =>
throw new IllegalArgumentException(s"[StreamPark] can no found flink-dist jar in $flinkLib")
case x if x.length > 1 =>
throw new IllegalArgumentException(
s"[StreamPark] found multiple flink-dist jar in $flinkLib")
case _ =>
}
distJar.head
}

lazy val version: String = {
val cmd = List(
s"java -classpath ${flinkDistJar.getName} org.apache.flink.client.cli.CliFrontend --version")
var flinkVersion: String = null
val buffer = new mutable.StringBuilder
CommandUtils.execute(
flinkLib.getAbsolutePath,
cmd,
new Consumer[String]() {
override def accept(out: String): Unit = {
buffer.append(out).append("\n")
val matcher = FLINK_VERSION_PATTERN.matcher(out)
if (matcher.find) {
val version = matcher.group(1)
val matcher1 = APACHE_FLINK_VERSION_PATTERN.matcher(version)
if (matcher1.find) {
flinkVersion = version
} else {
val matcher2 = OTHER_FLINK_VERSION_PATTERN.matcher(version)
if (matcher2.find) {
flinkVersion = version
}
}
}
}
})
lazy val flinkLibs: List[URL] = flinkLib.listFiles().map(_.toURI.toURL).toList

logInfo(buffer.toString())
if (flinkVersion == null) {
throw new IllegalStateException(s"[StreamPark] parse flink version failed. $buffer")
}
buffer.clear()
flinkVersion
private lazy val parsedVersion: (String, String) = {
parseFromDistJar()
.orElse(parseFromCliFrontend())
.getOrElse(
throw new IllegalStateException(
s"[StreamPark] parse flink version failed for flinkHome: $flinkHome. " +
"Please check whether $FLINK_HOME/lib/flink-dist*.jar exists."))
}

// flink major version, like "1.13", "1.14"
lazy val version: String = parsedVersion._1

lazy val scalaVersion: String = parsedVersion._2

lazy val fullVersion: String = s"${version}_$scalaVersion"

/** Resolved JAVA_HOME for Flink CLI and cluster-side JVM options. */
lazy val javaHome: Option[String] = FlinkEnvUtils.resolveJavaHome(flinkHome, version)

// flink major version, like "1.13", "2.2"
lazy val majorVersion: String = {
if (version == null) {
null
Expand All @@ -111,22 +98,10 @@ class FlinkVersion(val flinkHome: String) extends Serializable with Logger {
}
}

lazy val flinkDistJar: File = {
val distJar = flinkLib.listFiles().filter(_.getName.matches("flink-dist.*\\.jar"))
distJar match {
case x if x.isEmpty =>
throw new IllegalArgumentException(s"[StreamPark] can no found flink-dist jar in $flinkLib")
case x if x.length > 1 =>
throw new IllegalArgumentException(
s"[StreamPark] found multiple flink-dist jar in $flinkLib")
case _ =>
}
distJar.head
}

def checkVersion(throwException: Boolean = true): Boolean = {
version.split("\\.").map(_.trim.toInt) match {
case Array(1, v, _) if v >= 12 && v <= 20 => true
case Array(2, v, _) if v >= 0 && v <= 2 => true
case _ =>
if (throwException) {
throw new UnsupportedOperationException(s"Unsupported flink version: $version")
Expand All @@ -139,13 +114,80 @@ class FlinkVersion(val flinkHome: String) extends Serializable with Logger {
def checkVersion(sinceVersion: Int): Boolean = {
version.split("\\.").map(_.trim.toInt) match {
case Array(1, v, _) if v >= sinceVersion => true
case Array(2, _, _) => true
case _ => false
}
}

// StreamPark flink shims version, like "streampark-flink-shims_flink-1.13"
private lazy val shimsVersion: String = s"streampark-flink-shims_flink-$majorVersion"

private def parseFromDistJar(): Option[(String, String)] = {
val jarName = flinkDistJar.getName
val underscoreMatcher = FLINK_DIST_UNDERSCORE_PATTERN.matcher(jarName)
if (underscoreMatcher.matches()) {
val parsed = underscoreMatcher.group(2) -> underscoreMatcher.group(1)
logInfo(s"Flink version parsed from dist jar name: ${parsed._1}, scala: ${parsed._2}")
Some(parsed)
} else {
val dashMatcher = FLINK_DIST_DASH_PATTERN.matcher(jarName)
if (dashMatcher.matches()) {
val parsed = dashMatcher.group(1) -> "2.12"
logInfo(s"Flink version parsed from dist jar name: ${parsed._1}, scala: ${parsed._2}")
Some(parsed)
} else {
None
}
}
}

private def hintFlinkVersion(): String = parseFromDistJar().map(_._1).getOrElse("1.20.0")

private def parseFromCliFrontend(): Option[(String, String)] = {
var flinkVersion: String = null
val buffer = new mutable.StringBuilder
val javaHomeExport = FlinkEnvUtils
.resolveJavaHome(flinkHome, hintFlinkVersion())
.map(javaHome => s"export JAVA_HOME=$javaHome&&")
.getOrElse("")
val javaCmd = FlinkEnvUtils
.resolveJavaHome(flinkHome, hintFlinkVersion())
.map(_ + "/bin/java")
.getOrElse("java")
val cmd = List(
s"${javaHomeExport}$javaCmd -classpath ${flinkDistJar.getName} org.apache.flink.client.cli.CliFrontend --version")
CommandUtils.execute(
flinkLib.getAbsolutePath,
cmd,
new Consumer[String]() {
override def accept(out: String): Unit = {
buffer.append(out).append("\n")
val matcher = FLINK_VERSION_PATTERN.matcher(out)
if (matcher.find) {
val parsedVersion = matcher.group(1)
val matcher1 = APACHE_FLINK_VERSION_PATTERN.matcher(parsedVersion)
if (matcher1.find) {
flinkVersion = parsedVersion
} else {
val matcher2 = OTHER_FLINK_VERSION_PATTERN.matcher(parsedVersion)
if (matcher2.find) {
flinkVersion = parsedVersion
}
}
}
}
})

logInfo(buffer.toString())
if (flinkVersion == null) {
None
} else {
val scalaVer = parseFromDistJar().map(_._2).getOrElse("2.12")
logInfo(s"Flink version parsed from CliFrontend: $flinkVersion, scala: $scalaVer")
Some(flinkVersion -> scalaVer)
}
}

override def toString: String =
s"""
|----------------------------------------- flink version -----------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,20 +139,49 @@ object ClassLoaderUtils extends Logger {

@throws[Exception]
private[this] def addURL(file: File): Unit = {
val classLoader = ClassLoader.getSystemClassLoader
val url = file.toURI.toURL
val classLoaders = Seq(
Option(Thread.currentThread().getContextClassLoader),
Option(ClassLoader.getSystemClassLoader)).flatten.distinct

var lastError: Exception = null
classLoaders.foreach { classLoader =>
try {
addURLToClasspath(classLoader, url)
return
} catch {
case e: Exception => lastError = e
}
}
throw lastError
}

private[this] def addURLToClasspath(classLoader: ClassLoader, url: URL): Unit = {
classLoader match {
case c if c.isInstanceOf[URLClassLoader] =>
val addURL = classOf[URLClassLoader].getDeclaredMethod("addURL", Array(classOf[URL]): _*)
case urlClassLoader: URLClassLoader =>
val addURL =
classOf[URLClassLoader].getDeclaredMethod("addURL", Array(classOf[URL]): _*)
addURL.setAccessible(true)
addURL.invoke(c, file.toURI.toURL)
addURL.invoke(urlClassLoader, url)
case _ =>
val field = classLoader.getClass.getDeclaredField("ucp")
field.setAccessible(true)
val ucp = field.get(classLoader)
var clazz: Class[_] = classLoader.getClass
var ucpField: java.lang.reflect.Field = null
while (clazz != null && ucpField == null) {
try {
ucpField = clazz.getDeclaredField("ucp")
} catch {
case _: NoSuchFieldException => clazz = clazz.getSuperclass
}
}
if (ucpField == null) {
throw new NoSuchFieldException("ucp")
}
ucpField.setAccessible(true)
val ucp = ucpField.get(classLoader)
val addURL =
ucp.getClass.getDeclaredMethod("addURL", Array(classOf[URL]): _*)
addURL.setAccessible(true)
addURL.invoke(ucp, file.toURI.toURL)
addURL.invoke(ucp, url)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.streampark.common.util

import java.io.File
import java.nio.charset.StandardCharsets
import java.util.regex.Pattern

import scala.util.Try

object FlinkEnvUtils extends Logger {

private[this] lazy val JAVA_HOME_PATTERN =
Pattern.compile("""(?:^|\n)\s*(?:export\s+)?JAVA_HOME\s*=\s*(?:["']([^"']+)["']|(\S+))""")

/** Minimum Java major version required by the given Flink version string. */
def requiredJavaMajorVersion(flinkVersion: String): Int = {
flinkVersion.split("\\.").headOption.flatMap(v => Try(v.trim.toInt).toOption) match {
case Some(major) if major >= 2 => 11
case _ => 8
}
}

/**
* Resolve JAVA_HOME for Flink CLI and cluster-side JVM options.
*
* Resolution order:
* 1. `$FLINK_HOME/conf/flink-env.sh`
* 2. process environment `JAVA_HOME`
* 3. system auto-detection (macOS `/usr/libexec/java_home`, common Linux paths)
*/
def resolveJavaHome(flinkHome: String, flinkVersion: String): Option[String] = {
val minVersion = requiredJavaMajorVersion(flinkVersion)
parseJavaHomeFromFlinkEnv(flinkHome)
.filter(isValidJavaHome)
.orElse(Option(System.getenv("JAVA_HOME")).filter(isValidJavaHome))
.orElse(detectSystemJavaHome(minVersion).filter(isValidJavaHome))
}

def parseJavaHomeFromFlinkEnv(flinkHome: String): Option[String] = {
val flinkEnvFile = new File(flinkHome, "conf/flink-env.sh")
if (!flinkEnvFile.exists()) {
None
} else {
val content = org.apache.commons.io.FileUtils.readFileToString(flinkEnvFile, StandardCharsets.UTF_8)
extractJavaHome(content)
}
}

private[util] def extractJavaHome(content: String): Option[String] = {
val matcher = JAVA_HOME_PATTERN.matcher(content)
var result: Option[String] = None
while (matcher.find() && result.isEmpty) {
val value = Option(matcher.group(1)).getOrElse(matcher.group(2))
if (value != null && value.nonEmpty && !value.startsWith("#")) {
result = Some(value.trim)
}
}
result
}

private def detectSystemJavaHome(minMajor: Int): Option[String] = {
val os = System.getProperty("os.name", "").toLowerCase
if (os.contains("mac")) {
Try {
val (code, output) = CommandUtils.execute(s"/usr/libexec/java_home -v $minMajor 2>/dev/null")
if (code == 0 && output.trim.nonEmpty) Some(output.trim) else None
}.getOrElse(None)
} else {
val candidates = List(
Option(System.getenv(s"JAVA${minMajor}_HOME")),
Option(s"/usr/lib/jvm/java-$minMajor-openjdk"),
Option(s"/usr/lib/jvm/java-$minMajor-openjdk-amd64"),
Option(s"/usr/lib/jvm/java-$minMajor"))
.flatten
.filter(isValidJavaHome)
candidates.headOption
}
}

private def isValidJavaHome(javaHome: String): Boolean = {
javaHome != null && javaHome.nonEmpty && new File(javaHome, "bin/java").exists()
}

}
Loading
Loading