From 9e5b935a1301a2463ffa7d9ca9570ac2582bb370 Mon Sep 17 00:00:00 2001 From: Deepak Jain Date: Thu, 26 Mar 2026 23:05:43 -0700 Subject: [PATCH] YARN-11851. Make preemptable container tracking thread-safe. --- .../scheduler/fair/FSPreemptionThread.java | 27 ++-- .../fair/TestFSPreemptionThread.java | 126 ++++++++++++++++++ 2 files changed, 140 insertions(+), 13 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSPreemptionThread.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java index 48c0c981a7e19..f09121bedf6ff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java @@ -296,7 +296,7 @@ public void run() { * A class to track preemptable containers. */ private static class PreemptableContainers { - Map> containersByApp; + private final Map> containersByApp; int numAMContainers; int maxAMContainers; @@ -313,7 +313,8 @@ private static class PreemptableContainers { * @param container the container to add * @return true if success; false otherwise */ - private boolean addContainer(RMContainer container, ApplicationId appId) { + private synchronized boolean addContainer(RMContainer container, + ApplicationId appId) { if (container.isAMContainer()) { numAMContainers++; if (numAMContainers >= maxAMContainers) { @@ -321,15 +322,12 @@ private boolean addContainer(RMContainer container, ApplicationId appId) { } } - if (!containersByApp.containsKey(appId)) { - containersByApp.put(appId, new ArrayList<>()); - } - - containersByApp.get(appId).add(container); + containersByApp.computeIfAbsent(appId, key -> new ArrayList<>()) + .add(container); return true; } - private List getAllContainers() { + private synchronized List getAllContainers() { List allContainers = new ArrayList<>(); for (List containersForApp : containersByApp.values()) { allContainers.addAll(containersForApp); @@ -337,12 +335,15 @@ private List getAllContainers() { return allContainers; } - private Resource getResourcesToPreemptForApp(ApplicationId appId) { + private synchronized Resource getResourcesToPreemptForApp( + ApplicationId appId) { Resource resourcesToPreempt = Resources.createResource(0, 0); - if (containersByApp.containsKey(appId)) { - for (RMContainer container : containersByApp.get(appId)) { - Resources.addTo(resourcesToPreempt, container.getAllocatedResource()); - } + List containersForApp = containersByApp.get(appId); + if (containersForApp == null) { + return resourcesToPreempt; + } + for (RMContainer container : containersForApp) { + Resources.addTo(resourcesToPreempt, container.getAllocatedResource()); } return resourcesToPreempt; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSPreemptionThread.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSPreemptionThread.java new file mode 100644 index 0000000000000..3b0dc2f1d3dd9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSPreemptionThread.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Method; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.when; + +public class TestFSPreemptionThread { + + @Test + public void testPreemptableContainersConcurrentAccess() throws Exception { + Class preemptableContainersClass = Class.forName( + FSPreemptionThread.class.getName() + "$PreemptableContainers"); + Constructor constructor = + preemptableContainersClass.getDeclaredConstructor(int.class); + constructor.setAccessible(true); + Object preemptableContainers = constructor.newInstance(Integer.MAX_VALUE); + + Method addContainer = preemptableContainersClass.getDeclaredMethod( + "addContainer", RMContainer.class, ApplicationId.class); + Method getAllContainers = preemptableContainersClass.getDeclaredMethod( + "getAllContainers"); + addContainer.setAccessible(true); + getAllContainers.setAccessible(true); + + int iterations = 500; + CountDownLatch start = new CountDownLatch(1); + ExecutorService executor = Executors.newFixedThreadPool(2); + try { + Future writer = executor.submit(() -> { + await(start); + for (int i = 0; i < iterations; i++) { + invokeAddContainer(addContainer, preemptableContainers, i); + } + }); + Future reader = executor.submit(() -> { + await(start); + for (int i = 0; i < iterations; i++) { + invokeGetAllContainers(getAllContainers, preemptableContainers); + } + }); + + start.countDown(); + writer.get(); + reader.get(); + } finally { + executor.shutdownNow(); + assertTrue(executor.awaitTermination(30, TimeUnit.SECONDS)); + } + + @SuppressWarnings("unchecked") + List allContainers = + (List) getAllContainers.invoke(preemptableContainers); + assertEquals(iterations, allContainers.size()); + } + + private static void invokeAddContainer(Method addContainer, + Object preemptableContainers, int index) { + try { + addContainer.invoke(preemptableContainers, createContainer(index), + ApplicationId.newInstance(1L, index)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private static void invokeGetAllContainers(Method getAllContainers, + Object preemptableContainers) { + try { + getAllContainers.invoke(preemptableContainers); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private static RMContainer createContainer(int index) { + RMContainer container = Mockito.mock(RMContainer.class); + when(container.isAMContainer()).thenReturn(false); + Resource resource = Resources.createResource(1); + when(container.getAllocatedResource()).thenReturn(resource); + when(container.getContainerId()).thenReturn(null); + when(container.getNodeId()).thenReturn(null); + return container; + } + + private static void await(CountDownLatch start) { + try { + start.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } +}