diff --git a/docs/layouts/shortcodes/generated/environment_configuration.html b/docs/layouts/shortcodes/generated/environment_configuration.html index 6f520e5abff1d..84712c412cd82 100644 --- a/docs/layouts/shortcodes/generated/environment_configuration.html +++ b/docs/layouts/shortcodes/generated/environment_configuration.html @@ -38,6 +38,12 @@ String A string of default JVM options to prepend to env.java.opts.taskmanager. This is intended to be set by administrators. + +
env.java.home
+ (none) + String + Location where Java is installed. If not specified, Flink will use your default Java installation. +
env.java.opts.all
(none) diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index 58a33a6060a2f..5053f48ae0835 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -1740,6 +1740,8 @@ public final class ConfigConstants { // ----------------------------- Environment Variables ---------------------------- + public static final String ENV_JAVA_HOME = "JAVA_HOME"; + /** The environment variable name which contains the location of the configuration directory. */ public static final String ENV_FLINK_CONF_DIR = "FLINK_CONF_DIR"; diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java index 9cbf8810023e1..f1f72200b1c95 100755 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java @@ -39,6 +39,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; +import static org.apache.flink.configuration.ConfigConstants.ENV_JAVA_HOME; import static org.apache.flink.configuration.MetricOptions.SYSTEM_RESOURCE_METRICS; import static org.apache.flink.configuration.MetricOptions.SYSTEM_RESOURCE_METRICS_PROBING_INTERVAL; import static org.apache.flink.configuration.StructuredOptionsSplitter.escapeWithSingleQuote; @@ -665,6 +666,29 @@ public static boolean filterPrefixMapKey(String key, String candidate) { return candidate.startsWith(prefixKey); } + /** + * Set the JAVA_HOME variable in the provided environment map. + * + *

This method follows a specific priority order to determine the JAVA_HOME value: + * + *

    + *
  1. If the environment map already contains the JAVA_HOME key, the method does nothing. + *
  2. Otherwise, it attempts to retrieve JAVA_HOME from the Flink configuration using {@link + * CoreOptions#FLINK_JAVA_HOME}. + *
  3. If it isn't found in configuration, it falls back to the system environment variable. + *
  4. If a value is found through either source, it is added to the environment map. + *
+ */ + public static void setJavaHomeEnv(Configuration configuration, Map env) { + if (!env.containsKey(ENV_JAVA_HOME)) { + Optional.ofNullable( + configuration + .getOptional(CoreOptions.FLINK_JAVA_HOME) + .orElse(System.getenv(ENV_JAVA_HOME))) + .ifPresent(javaHomeStr -> env.put(ENV_JAVA_HOME, javaHomeStr)); + } + } + static Map convertToPropertiesPrefixed( Map confData, String key, boolean standardYaml) { final String prefixKey = key + "."; diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java index c4d50ce1072df..59ab46c4506b1 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java @@ -225,6 +225,17 @@ public static String[] mergeListsToArray(List base, List append) // process parameters // ------------------------------------------------------------------------ + public static final ConfigOption FLINK_JAVA_HOME = + ConfigOptions.key("env.java.home") + .stringType() + .noDefaultValue() + .withDescription( + Description.builder() + .text( + "Location where Java is installed. If not specified," + + " Flink will use your default Java installation.") + .build()); + public static final ConfigOption FLINK_JVM_OPTIONS = ConfigOptions.key("env.java.opts.all") .stringType() diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java index ba84f97cdb304..1ba09e6335d5a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.clusterframework; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ConfigurationUtils; import org.apache.flink.configuration.ResourceManagerOptions; import java.util.HashMap; @@ -90,6 +91,9 @@ public static ContaineredTaskManagerParameters create( } } + // set JAVA_HOME + ConfigurationUtils.setJavaHomeEnv(config, envVars); + // done return new ContaineredTaskManagerParameters(taskExecutorProcessSpec, envVars); } diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java index 63043af367d67..2ab222afe8a9b 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java @@ -1969,6 +1969,9 @@ Map generateApplicationMasterEnv( ConfigurationUtils.getPrefixedKeyValuePairs( ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX, this.flinkConfiguration)); + + // set JAVA_HOME + ConfigurationUtils.setJavaHomeEnv(this.flinkConfiguration, env); // set Flink app class path env.put(ENV_FLINK_CLASSPATH, classPathStr); // Set FLINK_LIB_DIR to `lib` folder under working dir in container diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java index 559071559d7e6..66cba0debd88c 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java @@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.MemorySize; +import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec; import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils; @@ -44,8 +45,11 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.stream.Stream; +import static org.apache.flink.configuration.ConfigConstants.ENV_JAVA_HOME; +import static org.apache.flink.configuration.ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX; import static org.apache.flink.yarn.configuration.YarnConfigOptions.YARN_CONTAINER_START_COMMAND_TEMPLATE; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -56,6 +60,19 @@ class UtilsTest { private static final String YARN_RM_ARBITRARY_SCHEDULER_CLAZZ = "org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler"; + private static final TaskExecutorProcessSpec TASK_EXECUTOR_PROCESS_SPEC = + new TaskExecutorProcessSpec( + new CPUResource(1.0), + new MemorySize(0), // frameworkHeapSize + new MemorySize(0), // frameworkOffHeapSize + new MemorySize(111), // taskHeapSize + new MemorySize(0), // taskOffHeapSize + new MemorySize(222), // networkMemSize + new MemorySize(0), // managedMemorySize + new MemorySize(333), // jvmMetaspaceSize + new MemorySize(0), // jvmOverheadSize + Collections.emptyList()); + @Test void testDeleteApplicationFiles(@TempDir Path tempDir) throws Exception { final Path applicationFilesDir = Files.createTempDirectory(tempDir, ".flink"); @@ -208,20 +225,8 @@ void testGetYarnConfiguration() { @Test void testGetTaskManagerShellCommand() { final Configuration cfg = new Configuration(); - final TaskExecutorProcessSpec taskExecutorProcessSpec = - new TaskExecutorProcessSpec( - new CPUResource(1.0), - new MemorySize(0), // frameworkHeapSize - new MemorySize(0), // frameworkOffHeapSize - new MemorySize(111), // taskHeapSize - new MemorySize(0), // taskOffHeapSize - new MemorySize(222), // networkMemSize - new MemorySize(0), // managedMemorySize - new MemorySize(333), // jvmMetaspaceSize - new MemorySize(0), // jvmOverheadSize - Collections.emptyList()); final ContaineredTaskManagerParameters containeredParams = - new ContaineredTaskManagerParameters(taskExecutorProcessSpec, new HashMap<>()); + new ContaineredTaskManagerParameters(TASK_EXECUTOR_PROCESS_SPEC, new HashMap<>()); // no logging, with/out krb5 final String java = "$JAVA_HOME/bin/java"; @@ -238,7 +243,8 @@ void testGetTaskManagerShellCommand() { + " -Dlog4j.configurationFile=file:./conf/log4j.properties"; // if set final String mainClass = "org.apache.flink.yarn.UtilsTest"; final String dynamicConfigs = - TaskExecutorProcessUtils.generateDynamicConfigsStr(taskExecutorProcessSpec).trim(); + TaskExecutorProcessUtils.generateDynamicConfigsStr(TASK_EXECUTOR_PROCESS_SPEC) + .trim(); final String basicArgs = "--configDir ./conf"; final String mainArgs = "-Djobmanager.rpc.address=host1 -Dkey.a=v1"; final String args = dynamicConfigs + " " + basicArgs + " " + mainArgs; @@ -674,6 +680,50 @@ void testGenerateJvmOptsString() { Utils.IGNORE_UNRECOGNIZED_VM_OPTIONS)); } + @Test + void testGetTaskManagerEnvsWithJavaHomeSet() { + final Configuration cfg = new Configuration(); + cfg.set(CoreOptions.FLINK_JAVA_HOME, "/opt/jdk"); + cfg.setString(CONTAINERIZED_TASK_MANAGER_ENV_PREFIX + "key", "val"); + final ContaineredTaskManagerParameters containeredParams = + ContaineredTaskManagerParameters.create(cfg, TASK_EXECUTOR_PROCESS_SPEC); + final Map envVars = containeredParams.taskManagerEnv(); + assertThat(envVars).containsEntry(ENV_JAVA_HOME, "/opt/jdk").containsEntry("key", "val"); + } + + @Test + void testGetTaskManagerEnvsWithEnvJavaHomeSet() { + final Configuration cfg = new Configuration(); + final String newJavaHome = "/usr/lib/jvm/java-openjdk-17"; + final Map oldEnv = System.getenv(); + try { + Map newEnv = new HashMap<>(System.getenv()); + newEnv.put(ConfigConstants.ENV_JAVA_HOME, newJavaHome); + CommonTestUtils.setEnv(newEnv); + + cfg.setString(CONTAINERIZED_TASK_MANAGER_ENV_PREFIX + "key", "val"); + final ContaineredTaskManagerParameters containeredParams = + ContaineredTaskManagerParameters.create(cfg, TASK_EXECUTOR_PROCESS_SPEC); + final Map envVars = containeredParams.taskManagerEnv(); + assertThat(envVars) + .containsEntry(ENV_JAVA_HOME, newJavaHome) + .containsEntry("key", "val"); + } finally { + CommonTestUtils.setEnv(oldEnv); + } + } + + @Test + void testGetTaskManagerEnvsWithoutJavaHomeSet() { + final Configuration cfg = new Configuration(); + final String origJavaHome = System.getenv(ConfigConstants.ENV_JAVA_HOME); + cfg.setString(CONTAINERIZED_TASK_MANAGER_ENV_PREFIX + "key", "val"); + final ContaineredTaskManagerParameters containeredParams = + ContaineredTaskManagerParameters.create(cfg, TASK_EXECUTOR_PROCESS_SPEC); + final Map envVars = containeredParams.taskManagerEnv(); + assertThat(envVars.get(ConfigConstants.ENV_JAVA_HOME)).isEqualTo(origJavaHome); + } + private static void verifyUnitResourceVariousSchedulers( YarnConfiguration yarnConfig, int minMem, int minVcore, int incMem, int incVcore) { yarnConfig.set(YarnConfiguration.RM_SCHEDULER, Utils.YARN_RM_FAIR_SCHEDULER_CLAZZ); diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java index f87313cf21bb9..c60af24537f4b 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java @@ -921,11 +921,14 @@ public void testGenerateApplicationMasterEnv(@TempDir File flinkHomeDir) throws final String fakeLocalFlinkJar = "./lib/flink_dist.jar"; final String fakeClassPath = fakeLocalFlinkJar + ":./usrlib/user.jar"; final ApplicationId appId = ApplicationId.newInstance(0, 0); + final Configuration flinkConfig = new Configuration(); + flinkConfig.set(CoreOptions.FLINK_JAVA_HOME, "/opt/jdk"); final Map masterEnv = getTestMasterEnv( - new Configuration(), flinkHomeDir, fakeClassPath, fakeLocalFlinkJar, appId); + flinkConfig, flinkHomeDir, fakeClassPath, fakeLocalFlinkJar, appId); assertThat(masterEnv) + .containsEntry(ConfigConstants.ENV_JAVA_HOME, "/opt/jdk") .containsEntry(ConfigConstants.ENV_FLINK_LIB_DIR, "./lib") .containsEntry(YarnConfigKeys.ENV_APP_ID, appId.toString()) .containsEntry( @@ -940,6 +943,44 @@ public void testGenerateApplicationMasterEnv(@TempDir File flinkHomeDir) throws .containsEntry(YarnConfigKeys.ENV_CLIENT_HOME_DIR, flinkHomeDir.getPath()); } + @Test + public void testContainerEnvJavaHomeNotOverriddenByDefault(@TempDir File flinkHomeDir) + throws IOException { + final Configuration flinkConfig = new Configuration(); + final String origJavaHome = System.getenv(ConfigConstants.ENV_JAVA_HOME); + final Map masterEnv = + getTestMasterEnv( + flinkConfig, + flinkHomeDir, + "", + "./lib/flink_dist.jar", + ApplicationId.newInstance(0, 0)); + assertThat(masterEnv.get(ConfigConstants.ENV_JAVA_HOME)).isEqualTo(origJavaHome); + } + + @Test + public void testContainerEnvJavaHomeNewValue(@TempDir File flinkHomeDir) throws IOException { + final Configuration flinkConfig = new Configuration(); + final String newJavaHome = "/usr/lib/jvm/java-openjdk-17"; + final Map oldEnv = System.getenv(); + + try { + Map newEnv = new HashMap<>(System.getenv()); + newEnv.put(ConfigConstants.ENV_JAVA_HOME, newJavaHome); + CommonTestUtils.setEnv(newEnv); + final Map masterEnv = + getTestMasterEnv( + flinkConfig, + flinkHomeDir, + "", + "./lib/flink_dist.jar", + ApplicationId.newInstance(0, 0)); + assertThat(masterEnv.get(ConfigConstants.ENV_JAVA_HOME)).isEqualTo(newJavaHome); + } finally { + CommonTestUtils.setEnv(oldEnv); + } + } + @Test public void testEnvFlinkLibDirVarNotOverriddenByContainerEnv(@TempDir File tmpDir) throws IOException {