Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2116,4 +2116,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
*/
public static final String SUPPORTED_PACKAGES_CONFIG_NAME =
"dfs.nodeplan.steps.supported.packages";

public static final String DFS_NAMENODE_DATANODE_LIST_CACHE_EXPIRATION_MS_KEY =
"dfs.namenode.datanode.list.cache.expiration.ms";
public static final long DFS_NAMENODE_DATANODE_LIST_CACHE_EXPIRATION_MS_DEFAULT = 0;
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.Preconditions;

import org.apache.hadoop.thirdparty.com.google.common.cache.Cache;
import org.apache.hadoop.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hadoop.thirdparty.com.google.common.net.InetAddresses;

Expand Down Expand Up @@ -70,6 +72,7 @@
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
Expand Down Expand Up @@ -230,6 +233,9 @@ public class DatanodeManager {

private final boolean randomNodeOrderEnabled;

/** Cached map of DatanodeReportType -> list of DatanodeDescriptor for metrics purposes. */
private Cache<DatanodeReportType, List<DatanodeDescriptor>> datanodeListSnapshots = null;

DatanodeManager(final BlockManager blockManager, final Namesystem namesystem,
final Configuration conf) throws IOException {
this.namesystem = namesystem;
Expand Down Expand Up @@ -364,6 +370,17 @@ public class DatanodeManager {
this.randomNodeOrderEnabled = conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_RANDOM_NODE_ORDER_ENABLED,
DFSConfigKeys.DFS_NAMENODE_RANDOM_NODE_ORDER_ENABLED_DEFAULT);

long datanodeListCacheExpirationMs =
conf.getLong(DFSConfigKeys.DFS_NAMENODE_DATANODE_LIST_CACHE_EXPIRATION_MS_KEY,
DFSConfigKeys.DFS_NAMENODE_DATANODE_LIST_CACHE_EXPIRATION_MS_DEFAULT);
if (datanodeListCacheExpirationMs > 0) {
LOG.info("Using cached DN list for metrics, expiration time = {} ms.",
datanodeListCacheExpirationMs);
datanodeListSnapshots = CacheBuilder.newBuilder()
.expireAfterWrite(datanodeListCacheExpirationMs, TimeUnit.MILLISECONDS)
.build();
}
}

/**
Expand Down Expand Up @@ -963,6 +980,11 @@ private void wipeDatanode(final DatanodeID node) {
synchronized (this) {
host2DatanodeMap.remove(datanodeMap.remove(key));
}
Cache<DatanodeReportType, List<DatanodeDescriptor>> tmpDatanodeListSnapshots =
datanodeListSnapshots;
if (tmpDatanodeListSnapshots != null) {
tmpDatanodeListSnapshots.invalidateAll();
}
if (LOG.isDebugEnabled()) {
LOG.debug("{}.wipeDatanode({}): storage {} is removed from datanodeMap.",
getClass().getSimpleName(), node, key);
Expand Down Expand Up @@ -1438,7 +1460,7 @@ public int getNumLiveDataNodes() {

/** @return the number of dead datanodes. */
public int getNumDeadDataNodes() {
return getDatanodeListForReport(DatanodeReportType.DEAD).size();
return getDatanodeListForReportWithCache(DatanodeReportType.DEAD).size();
}

/** @return the number of datanodes. */
Expand All @@ -1453,12 +1475,12 @@ public List<DatanodeDescriptor> getDecommissioningNodes() {
// There is no need to take namesystem reader lock as
// getDatanodeListForReport will synchronize on datanodeMap
// A decommissioning DN may be "alive" or "dead".
return getDatanodeListForReport(DatanodeReportType.DECOMMISSIONING);
return getDatanodeListForReportWithCache(DatanodeReportType.DECOMMISSIONING);
}

/** @return list of datanodes that are entering maintenance. */
public List<DatanodeDescriptor> getEnteringMaintenanceNodes() {
return getDatanodeListForReport(DatanodeReportType.ENTERING_MAINTENANCE);
return getDatanodeListForReportWithCache(DatanodeReportType.ENTERING_MAINTENANCE);
}

/* Getter and Setter for stale DataNodes related attributes */
Expand Down Expand Up @@ -1532,17 +1554,35 @@ void setNumStaleStorages(int numStaleStorages) {
this.numStaleStorages = numStaleStorages;
}

/** Fetch live and dead datanodes. */
public void fetchDatanodes(final List<DatanodeDescriptor> live,
public void fetchDatanodes(final List<DatanodeDescriptor> live,
final List<DatanodeDescriptor> dead, final boolean removeDecommissionNode) {
fetchDatanodes(live, dead, removeDecommissionNode, false);
}

/**
* Fetches live and dead datanodes via cache maps. Generates and caches results
* on cache miss.
*/
public void fetchDatanodesWithCache(final List<DatanodeDescriptor> live,
final List<DatanodeDescriptor> dead, final boolean removeDecommissionNode) {
fetchDatanodes(live, dead, removeDecommissionNode, true);
}

/** Fetch live and dead datanodes. */
private void fetchDatanodes(final List<DatanodeDescriptor> live,
final List<DatanodeDescriptor> dead, final boolean removeDecommissionNode, boolean useCache) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add a method without many changes.

fetchDatanodes(final List<DatanodeDescriptor> live, 
      final List<DatanodeDescriptor> dead, final boolean removeDecommissionNode) {
  fetchDatanodes(live, dead, removeDecommisionNode, false);
}

if (live == null && dead == null) {
throw new HadoopIllegalArgumentException("Both live and dead lists are null");
}

// There is no need to take namesystem reader lock as
// getDatanodeListForReport will synchronize on datanodeMap
final List<DatanodeDescriptor> results =
getDatanodeListForReport(DatanodeReportType.ALL);
List<DatanodeDescriptor> results;
if (useCache) {
results = getDatanodeListForReportWithCache(DatanodeReportType.ALL);
} else {
// There is no need to take namesystem reader lock as
// getDatanodeListForReport will synchronize on datanodeMap
results = getDatanodeListForReport(DatanodeReportType.ALL);
}
for(DatanodeDescriptor node : results) {
if (isDatanodeDead(node)) {
if (dead != null) {
Expand Down Expand Up @@ -1635,6 +1675,25 @@ private DatanodeID parseDNFromHostsEntry(String hostLine) {
return dnId;
}

/**
* Low impact version of {@link #getDatanodeListForReport} with possible stale
* data for low impact usage (metrics).
*/
public List<DatanodeDescriptor> getDatanodeListForReportWithCache(
final DatanodeReportType type) {
Cache<DatanodeReportType, List<DatanodeDescriptor>> tmpDatanodeListSnapshots =
datanodeListSnapshots;
if (tmpDatanodeListSnapshots == null) {
return getDatanodeListForReport(type);
}
try {
return tmpDatanodeListSnapshots.get(type, () -> getDatanodeListForReport(type));
} catch (ExecutionException e) {
// Fallback if cache fails
return getDatanodeListForReport(type);
}
}

/** For generating datanode reports */
public List<DatanodeDescriptor> getDatanodeListForReport(
final DatanodeReportType type) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5789,8 +5789,8 @@ public int getNumDeadDataNodes() {
@Metric({"NumDecomLiveDataNodes",
"Number of datanodes which have been decommissioned and are now live"})
public int getNumDecomLiveDataNodes() {
final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
getBlockManager().getDatanodeManager().fetchDatanodes(live, null, false);
final List<DatanodeDescriptor> live = new ArrayList<>();
getBlockManager().getDatanodeManager().fetchDatanodesWithCache(live, null, false);
int liveDecommissioned = 0;
for (DatanodeDescriptor node : live) {
liveDecommissioned += node.isDecommissioned() ? 1 : 0;
Expand All @@ -5802,8 +5802,8 @@ public int getNumDecomLiveDataNodes() {
@Metric({"NumDecomDeadDataNodes",
"Number of datanodes which have been decommissioned and are now dead"})
public int getNumDecomDeadDataNodes() {
final List<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
getBlockManager().getDatanodeManager().fetchDatanodes(null, dead, false);
final List<DatanodeDescriptor> dead = new ArrayList<>();
getBlockManager().getDatanodeManager().fetchDatanodesWithCache(null, dead, false);
int deadDecommissioned = 0;
for (DatanodeDescriptor node : dead) {
deadDecommissioned += node.isDecommissioned() ? 1 : 0;
Expand All @@ -5815,8 +5815,8 @@ public int getNumDecomDeadDataNodes() {
@Metric({"NumInServiceLiveDataNodes",
"Number of live datanodes which are currently in service"})
public int getNumInServiceLiveDataNodes() {
final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
getBlockManager().getDatanodeManager().fetchDatanodes(live, null, true);
final List<DatanodeDescriptor> live = new ArrayList<>();
getBlockManager().getDatanodeManager().fetchDatanodesWithCache(live, null, true);
int liveInService = live.size();
for (DatanodeDescriptor node : live) {
liveInService -= node.isInMaintenance() ? 1 : 0;
Expand All @@ -5828,8 +5828,8 @@ public int getNumInServiceLiveDataNodes() {
@Metric({"VolumeFailuresTotal",
"Total number of volume failures across all Datanodes"})
public int getVolumeFailuresTotal() {
List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
getBlockManager().getDatanodeManager().fetchDatanodes(live, null, false);
List<DatanodeDescriptor> live = new ArrayList<>();
getBlockManager().getDatanodeManager().fetchDatanodesWithCache(live, null, false);
int volumeFailuresTotal = 0;
for (DatanodeDescriptor node: live) {
volumeFailuresTotal += node.getVolumeFailures();
Expand All @@ -5841,8 +5841,8 @@ public int getVolumeFailuresTotal() {
@Metric({"EstimatedCapacityLostTotal",
"An estimate of the total capacity lost due to volume failures"})
public long getEstimatedCapacityLostTotal() {
List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
getBlockManager().getDatanodeManager().fetchDatanodes(live, null, false);
List<DatanodeDescriptor> live = new ArrayList<>();
getBlockManager().getDatanodeManager().fetchDatanodesWithCache(live, null, false);
long estimatedCapacityLostTotal = 0;
for (DatanodeDescriptor node: live) {
VolumeFailureSummary volumeFailureSummary = node.getVolumeFailureSummary();
Expand Down Expand Up @@ -6731,10 +6731,9 @@ public int getThreads() {
*/
@Override // NameNodeMXBean
public String getLiveNodes() {
final Map<String, Map<String,Object>> info =
new HashMap<String, Map<String,Object>>();
final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
blockManager.getDatanodeManager().fetchDatanodes(live, null, false);
final Map<String, Map<String, Object>> info = new HashMap<>();
final List<DatanodeDescriptor> live = new ArrayList<>();
blockManager.getDatanodeManager().fetchDatanodesWithCache(live, null, false);
for (DatanodeDescriptor node : live) {
ImmutableMap.Builder<String, Object> innerinfo =
ImmutableMap.<String,Object>builder();
Expand Down Expand Up @@ -6786,10 +6785,9 @@ public String getLiveNodes() {
*/
@Override // NameNodeMXBean
public String getDeadNodes() {
final Map<String, Map<String, Object>> info =
new HashMap<String, Map<String, Object>>();
final List<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
blockManager.getDatanodeManager().fetchDatanodes(null, dead, false);
final Map<String, Map<String, Object>> info = new HashMap<>();
final List<DatanodeDescriptor> dead = new ArrayList<>();
blockManager.getDatanodeManager().fetchDatanodesWithCache(null, dead, false);
for (DatanodeDescriptor node : dead) {
Map<String, Object> innerinfo = ImmutableMap.<String, Object>builder()
.put("lastContact", getLastContact(node))
Expand Down Expand Up @@ -6917,10 +6915,9 @@ public String getNodeUsage() {
float min = 0;
float dev = 0;

final Map<String, Map<String,Object>> info =
new HashMap<String, Map<String,Object>>();
final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
blockManager.getDatanodeManager().fetchDatanodes(live, null, true);
final Map<String, Map<String, Object>> info = new HashMap<>();
final List<DatanodeDescriptor> live = new ArrayList<>();
blockManager.getDatanodeManager().fetchDatanodesWithCache(live, null, true);
for (Iterator<DatanodeDescriptor> it = live.iterator(); it.hasNext();) {
DatanodeDescriptor node = it.next();
if (!node.isInService()) {
Expand Down Expand Up @@ -9095,8 +9092,8 @@ public long getBytesInFuture() {
@Metric({"NumInMaintenanceLiveDataNodes",
"Number of live Datanodes which are in maintenance state"})
public int getNumInMaintenanceLiveDataNodes() {
final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
getBlockManager().getDatanodeManager().fetchDatanodes(live, null, true);
final List<DatanodeDescriptor> live = new ArrayList<>();
getBlockManager().getDatanodeManager().fetchDatanodesWithCache(live, null, true);
int liveInMaintenance = 0;
for (DatanodeDescriptor node : live) {
liveInMaintenance += node.isInMaintenance() ? 1 : 0;
Expand All @@ -9108,8 +9105,8 @@ public int getNumInMaintenanceLiveDataNodes() {
@Metric({"NumInMaintenanceDeadDataNodes",
"Number of dead Datanodes which are in maintenance state"})
public int getNumInMaintenanceDeadDataNodes() {
final List<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
getBlockManager().getDatanodeManager().fetchDatanodes(null, dead, true);
final List<DatanodeDescriptor> dead = new ArrayList<>();
getBlockManager().getDatanodeManager().fetchDatanodesWithCache(null, dead, true);
int deadInMaintenance = 0;
for (DatanodeDescriptor node : dead) {
deadInMaintenance += node.isInMaintenance() ? 1 : 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6720,4 +6720,14 @@
If you have more than one package, separate the packages using commas.
</description>
</property>
<property>
<name>dfs.namenode.datanode.list.cache.expiration.ms</name>
<value>0</value>
<description>
Set to a positive number to cache values for DatanodeManager.getDatanodeListForReport for
performance purpose. Milliseconds for cache expiration from insertion. 0 or negative value
to disable this cache.
Non metrics usage will bypass this cache (fsck, datanodeReport, etc.)
</description>
</property>
</configuration>
Loading