diff --git a/docs/layouts/shortcodes/generated/environment_configuration.html b/docs/layouts/shortcodes/generated/environment_configuration.html
index 6f520e5abff1d..84712c412cd82 100644
--- a/docs/layouts/shortcodes/generated/environment_configuration.html
+++ b/docs/layouts/shortcodes/generated/environment_configuration.html
@@ -38,6 +38,12 @@
String |
A string of default JVM options to prepend to env.java.opts.taskmanager. This is intended to be set by administrators. |
+
+ env.java.home |
+ (none) |
+ String |
+ Location where Java is installed. If not specified, Flink will use your default Java installation. |
+
env.java.opts.all |
(none) |
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 58a33a6060a2f..5053f48ae0835 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -1740,6 +1740,8 @@ public final class ConfigConstants {
// ----------------------------- Environment Variables ----------------------------
+ public static final String ENV_JAVA_HOME = "JAVA_HOME";
+
/** The environment variable name which contains the location of the configuration directory. */
public static final String ENV_FLINK_CONF_DIR = "FLINK_CONF_DIR";
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
index 9cbf8810023e1..f1f72200b1c95 100755
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
@@ -39,6 +39,7 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
+import static org.apache.flink.configuration.ConfigConstants.ENV_JAVA_HOME;
import static org.apache.flink.configuration.MetricOptions.SYSTEM_RESOURCE_METRICS;
import static org.apache.flink.configuration.MetricOptions.SYSTEM_RESOURCE_METRICS_PROBING_INTERVAL;
import static org.apache.flink.configuration.StructuredOptionsSplitter.escapeWithSingleQuote;
@@ -665,6 +666,29 @@ public static boolean filterPrefixMapKey(String key, String candidate) {
return candidate.startsWith(prefixKey);
}
+ /**
+ * Set the JAVA_HOME variable in the provided environment map.
+ *
+ * This method follows a specific priority order to determine the JAVA_HOME value:
+ *
+ *
+ * - If the environment map already contains the JAVA_HOME key, the method does nothing.
+ *
- Otherwise, it attempts to retrieve JAVA_HOME from the Flink configuration using {@link
+ * CoreOptions#FLINK_JAVA_HOME}.
+ *
- If it isn't found in configuration, it falls back to the system environment variable.
+ *
- If a value is found through either source, it is added to the environment map.
+ *
+ */
+ public static void setJavaHomeEnv(Configuration configuration, Map env) {
+ if (!env.containsKey(ENV_JAVA_HOME)) {
+ Optional.ofNullable(
+ configuration
+ .getOptional(CoreOptions.FLINK_JAVA_HOME)
+ .orElse(System.getenv(ENV_JAVA_HOME)))
+ .ifPresent(javaHomeStr -> env.put(ENV_JAVA_HOME, javaHomeStr));
+ }
+ }
+
static Map convertToPropertiesPrefixed(
Map confData, String key, boolean standardYaml) {
final String prefixKey = key + ".";
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
index c4d50ce1072df..59ab46c4506b1 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -225,6 +225,17 @@ public static String[] mergeListsToArray(List base, List append)
// process parameters
// ------------------------------------------------------------------------
+ public static final ConfigOption FLINK_JAVA_HOME =
+ ConfigOptions.key("env.java.home")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "Location where Java is installed. If not specified,"
+ + " Flink will use your default Java installation.")
+ .build());
+
public static final ConfigOption FLINK_JVM_OPTIONS =
ConfigOptions.key("env.java.opts.all")
.stringType()
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
index ba84f97cdb304..1ba09e6335d5a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.clusterframework;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.ResourceManagerOptions;
import java.util.HashMap;
@@ -90,6 +91,9 @@ public static ContaineredTaskManagerParameters create(
}
}
+ // set JAVA_HOME
+ ConfigurationUtils.setJavaHomeEnv(config, envVars);
+
// done
return new ContaineredTaskManagerParameters(taskExecutorProcessSpec, envVars);
}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 63043af367d67..2ab222afe8a9b 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -1969,6 +1969,9 @@ Map generateApplicationMasterEnv(
ConfigurationUtils.getPrefixedKeyValuePairs(
ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX,
this.flinkConfiguration));
+
+ // set JAVA_HOME
+ ConfigurationUtils.setJavaHomeEnv(this.flinkConfiguration, env);
// set Flink app class path
env.put(ENV_FLINK_CLASSPATH, classPathStr);
// Set FLINK_LIB_DIR to `lib` folder under working dir in container
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
index 559071559d7e6..66cba0debd88c 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
@@ -24,6 +24,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
@@ -44,8 +45,11 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.stream.Stream;
+import static org.apache.flink.configuration.ConfigConstants.ENV_JAVA_HOME;
+import static org.apache.flink.configuration.ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX;
import static org.apache.flink.yarn.configuration.YarnConfigOptions.YARN_CONTAINER_START_COMMAND_TEMPLATE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -56,6 +60,19 @@ class UtilsTest {
private static final String YARN_RM_ARBITRARY_SCHEDULER_CLAZZ =
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler";
+ private static final TaskExecutorProcessSpec TASK_EXECUTOR_PROCESS_SPEC =
+ new TaskExecutorProcessSpec(
+ new CPUResource(1.0),
+ new MemorySize(0), // frameworkHeapSize
+ new MemorySize(0), // frameworkOffHeapSize
+ new MemorySize(111), // taskHeapSize
+ new MemorySize(0), // taskOffHeapSize
+ new MemorySize(222), // networkMemSize
+ new MemorySize(0), // managedMemorySize
+ new MemorySize(333), // jvmMetaspaceSize
+ new MemorySize(0), // jvmOverheadSize
+ Collections.emptyList());
+
@Test
void testDeleteApplicationFiles(@TempDir Path tempDir) throws Exception {
final Path applicationFilesDir = Files.createTempDirectory(tempDir, ".flink");
@@ -208,20 +225,8 @@ void testGetYarnConfiguration() {
@Test
void testGetTaskManagerShellCommand() {
final Configuration cfg = new Configuration();
- final TaskExecutorProcessSpec taskExecutorProcessSpec =
- new TaskExecutorProcessSpec(
- new CPUResource(1.0),
- new MemorySize(0), // frameworkHeapSize
- new MemorySize(0), // frameworkOffHeapSize
- new MemorySize(111), // taskHeapSize
- new MemorySize(0), // taskOffHeapSize
- new MemorySize(222), // networkMemSize
- new MemorySize(0), // managedMemorySize
- new MemorySize(333), // jvmMetaspaceSize
- new MemorySize(0), // jvmOverheadSize
- Collections.emptyList());
final ContaineredTaskManagerParameters containeredParams =
- new ContaineredTaskManagerParameters(taskExecutorProcessSpec, new HashMap<>());
+ new ContaineredTaskManagerParameters(TASK_EXECUTOR_PROCESS_SPEC, new HashMap<>());
// no logging, with/out krb5
final String java = "$JAVA_HOME/bin/java";
@@ -238,7 +243,8 @@ void testGetTaskManagerShellCommand() {
+ " -Dlog4j.configurationFile=file:./conf/log4j.properties"; // if set
final String mainClass = "org.apache.flink.yarn.UtilsTest";
final String dynamicConfigs =
- TaskExecutorProcessUtils.generateDynamicConfigsStr(taskExecutorProcessSpec).trim();
+ TaskExecutorProcessUtils.generateDynamicConfigsStr(TASK_EXECUTOR_PROCESS_SPEC)
+ .trim();
final String basicArgs = "--configDir ./conf";
final String mainArgs = "-Djobmanager.rpc.address=host1 -Dkey.a=v1";
final String args = dynamicConfigs + " " + basicArgs + " " + mainArgs;
@@ -674,6 +680,50 @@ void testGenerateJvmOptsString() {
Utils.IGNORE_UNRECOGNIZED_VM_OPTIONS));
}
+ @Test
+ void testGetTaskManagerEnvsWithJavaHomeSet() {
+ final Configuration cfg = new Configuration();
+ cfg.set(CoreOptions.FLINK_JAVA_HOME, "/opt/jdk");
+ cfg.setString(CONTAINERIZED_TASK_MANAGER_ENV_PREFIX + "key", "val");
+ final ContaineredTaskManagerParameters containeredParams =
+ ContaineredTaskManagerParameters.create(cfg, TASK_EXECUTOR_PROCESS_SPEC);
+ final Map envVars = containeredParams.taskManagerEnv();
+ assertThat(envVars).containsEntry(ENV_JAVA_HOME, "/opt/jdk").containsEntry("key", "val");
+ }
+
+ @Test
+ void testGetTaskManagerEnvsWithEnvJavaHomeSet() {
+ final Configuration cfg = new Configuration();
+ final String newJavaHome = "/usr/lib/jvm/java-openjdk-17";
+ final Map oldEnv = System.getenv();
+ try {
+ Map newEnv = new HashMap<>(System.getenv());
+ newEnv.put(ConfigConstants.ENV_JAVA_HOME, newJavaHome);
+ CommonTestUtils.setEnv(newEnv);
+
+ cfg.setString(CONTAINERIZED_TASK_MANAGER_ENV_PREFIX + "key", "val");
+ final ContaineredTaskManagerParameters containeredParams =
+ ContaineredTaskManagerParameters.create(cfg, TASK_EXECUTOR_PROCESS_SPEC);
+ final Map envVars = containeredParams.taskManagerEnv();
+ assertThat(envVars)
+ .containsEntry(ENV_JAVA_HOME, newJavaHome)
+ .containsEntry("key", "val");
+ } finally {
+ CommonTestUtils.setEnv(oldEnv);
+ }
+ }
+
+ @Test
+ void testGetTaskManagerEnvsWithoutJavaHomeSet() {
+ final Configuration cfg = new Configuration();
+ final String origJavaHome = System.getenv(ConfigConstants.ENV_JAVA_HOME);
+ cfg.setString(CONTAINERIZED_TASK_MANAGER_ENV_PREFIX + "key", "val");
+ final ContaineredTaskManagerParameters containeredParams =
+ ContaineredTaskManagerParameters.create(cfg, TASK_EXECUTOR_PROCESS_SPEC);
+ final Map envVars = containeredParams.taskManagerEnv();
+ assertThat(envVars.get(ConfigConstants.ENV_JAVA_HOME)).isEqualTo(origJavaHome);
+ }
+
private static void verifyUnitResourceVariousSchedulers(
YarnConfiguration yarnConfig, int minMem, int minVcore, int incMem, int incVcore) {
yarnConfig.set(YarnConfiguration.RM_SCHEDULER, Utils.YARN_RM_FAIR_SCHEDULER_CLAZZ);
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
index f87313cf21bb9..c60af24537f4b 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
@@ -921,11 +921,14 @@ public void testGenerateApplicationMasterEnv(@TempDir File flinkHomeDir) throws
final String fakeLocalFlinkJar = "./lib/flink_dist.jar";
final String fakeClassPath = fakeLocalFlinkJar + ":./usrlib/user.jar";
final ApplicationId appId = ApplicationId.newInstance(0, 0);
+ final Configuration flinkConfig = new Configuration();
+ flinkConfig.set(CoreOptions.FLINK_JAVA_HOME, "/opt/jdk");
final Map masterEnv =
getTestMasterEnv(
- new Configuration(), flinkHomeDir, fakeClassPath, fakeLocalFlinkJar, appId);
+ flinkConfig, flinkHomeDir, fakeClassPath, fakeLocalFlinkJar, appId);
assertThat(masterEnv)
+ .containsEntry(ConfigConstants.ENV_JAVA_HOME, "/opt/jdk")
.containsEntry(ConfigConstants.ENV_FLINK_LIB_DIR, "./lib")
.containsEntry(YarnConfigKeys.ENV_APP_ID, appId.toString())
.containsEntry(
@@ -940,6 +943,44 @@ public void testGenerateApplicationMasterEnv(@TempDir File flinkHomeDir) throws
.containsEntry(YarnConfigKeys.ENV_CLIENT_HOME_DIR, flinkHomeDir.getPath());
}
+ @Test
+ public void testContainerEnvJavaHomeNotOverriddenByDefault(@TempDir File flinkHomeDir)
+ throws IOException {
+ final Configuration flinkConfig = new Configuration();
+ final String origJavaHome = System.getenv(ConfigConstants.ENV_JAVA_HOME);
+ final Map masterEnv =
+ getTestMasterEnv(
+ flinkConfig,
+ flinkHomeDir,
+ "",
+ "./lib/flink_dist.jar",
+ ApplicationId.newInstance(0, 0));
+ assertThat(masterEnv.get(ConfigConstants.ENV_JAVA_HOME)).isEqualTo(origJavaHome);
+ }
+
+ @Test
+ public void testContainerEnvJavaHomeNewValue(@TempDir File flinkHomeDir) throws IOException {
+ final Configuration flinkConfig = new Configuration();
+ final String newJavaHome = "/usr/lib/jvm/java-openjdk-17";
+ final Map oldEnv = System.getenv();
+
+ try {
+ Map newEnv = new HashMap<>(System.getenv());
+ newEnv.put(ConfigConstants.ENV_JAVA_HOME, newJavaHome);
+ CommonTestUtils.setEnv(newEnv);
+ final Map masterEnv =
+ getTestMasterEnv(
+ flinkConfig,
+ flinkHomeDir,
+ "",
+ "./lib/flink_dist.jar",
+ ApplicationId.newInstance(0, 0));
+ assertThat(masterEnv.get(ConfigConstants.ENV_JAVA_HOME)).isEqualTo(newJavaHome);
+ } finally {
+ CommonTestUtils.setEnv(oldEnv);
+ }
+ }
+
@Test
public void testEnvFlinkLibDirVarNotOverriddenByContainerEnv(@TempDir File tmpDir)
throws IOException {