Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,14 @@ public interface DataEncryptionKeyFactory {
* @throws IOException for any error
*/
DataEncryptionKey newDataEncryptionKey() throws IOException;

/**
* Clear the cached data encryption key, so that a new key will be
* generated on the next call to {@link #newDataEncryptionKey()}.
* This is called when an InvalidEncryptionKeyException is received
* to force a key refresh on retry.
*/
default void clearDataEncryptionKey() {
// no-op by default
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockPinningException;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
Expand Down Expand Up @@ -358,7 +359,8 @@ private boolean addTo(StorageGroup g) {
}

/** Dispatch the move to the proxy source & wait for the response. */
private void dispatch() {
@VisibleForTesting
void dispatch() {
Socket sock = new Socket();
DataOutputStream out = null;
DataInputStream in = null;
Expand All @@ -372,29 +374,54 @@ private void dispatch() {
LOG.info("Start moving " + this);
assert !(reportedBlock instanceof DBlockStriped);

sock.connect(
NetUtils.createSocketAddr(target.getDatanodeInfo().
getXferAddr(Dispatcher.this.connectToDnViaHostname)),
HdfsConstants.READ_TIMEOUT);

// Set read timeout so that it doesn't hang forever against
// unresponsive nodes. Datanode normally sends IN_PROGRESS response
// twice within the client read timeout period (every 30 seconds by
// default). Here, we make it give up after 5 minutes of no response.
sock.setSoTimeout(HdfsConstants.READ_TIMEOUT * 5);
sock.setKeepAlive(true);

OutputStream unbufOut = sock.getOutputStream();
InputStream unbufIn = sock.getInputStream();
ExtendedBlock eb = new ExtendedBlock(nnc.getBlockpoolID(),
reportedBlock.getBlock());
final KeyManager km = nnc.getKeyManager();
final KeyManager km = nnc.getKeyManager();
Token<BlockTokenIdentifier> accessToken = km.getAccessToken(eb,
new StorageType[]{target.storageType}, new String[0]);
IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
unbufIn, km, accessToken, target.getDatanodeInfo());
unbufOut = saslStreams.out;
unbufIn = saslStreams.in;
OutputStream unbufOut;
InputStream unbufIn;
int encryptionKeyRetryCount = 0;
while (true) {
try {
sock.connect(
NetUtils.createSocketAddr(target.getDatanodeInfo().
getXferAddr(Dispatcher.this.connectToDnViaHostname)),
HdfsConstants.READ_TIMEOUT);

// Set read timeout so that it doesn't hang forever against
// unresponsive nodes. Datanode normally sends IN_PROGRESS
// response twice within the client read timeout period (every
// 30 seconds by default). Here, we make it give up after 5
// minutes of no response.
sock.setSoTimeout(HdfsConstants.READ_TIMEOUT * 5);
sock.setKeepAlive(true);

unbufOut = sock.getOutputStream();
unbufIn = sock.getInputStream();
IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
unbufIn, km, accessToken, target.getDatanodeInfo());
unbufOut = saslStreams.out;
unbufIn = saslStreams.in;
break;
} catch (InvalidEncryptionKeyException e) {
IOUtils.closeSocket(sock);
if (++encryptionKeyRetryCount > 1) {
throw e;
}
LOG.info("Retrying connection to {} for block {} after "
+ "InvalidEncryptionKeyException, will wait before retry",
target.getDatanodeInfo(), reportedBlock.getBlock(), e);
km.clearDataEncryptionKey();
try {
Thread.sleep(HdfsConstants.READ_TIMEOUT);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw e;
}
sock = new Socket();
}
}
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
ioFileBufferSize));
in = new DataInputStream(new BufferedInputStream(unbufIn,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,16 @@ public DataEncryptionKey newDataEncryptionKey() {
}
}

/**
* Clear the cached data encryption key, so that a new key will be generated
* on the next call to {@link #newDataEncryptionKey()}.
*/
@Override
public synchronized void clearDataEncryptionKey() {
LOG.debug("Clearing data encryption key");
encryptionKey = null;
}

@Override
public void close() {
shouldRun = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;

import org.apache.hadoop.classification.InterfaceAudience;
Expand All @@ -37,6 +38,7 @@
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockPinningException;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
Expand Down Expand Up @@ -110,25 +112,48 @@ public BlockMovementStatus moveBlock(BlockMovingInfo blkMovingInfo,
DataOutputStream out = null;
DataInputStream in = null;
try {
NetUtils.connect(sock,
NetUtils.createSocketAddr(
blkMovingInfo.getTarget().getXferAddr(connectToDnViaHostname)),
socketTimeout);
// Set read timeout so that it doesn't hang forever against
// unresponsive nodes. Datanode normally sends IN_PROGRESS response
// twice within the client read timeout period (every 30 seconds by
// default). Here, we make it give up after "socketTimeout * 5" period
// of no response.
sock.setSoTimeout(socketTimeout * 5);
sock.setKeepAlive(true);
OutputStream unbufOut = sock.getOutputStream();
InputStream unbufIn = sock.getInputStream();
LOG.debug("Connecting to datanode {}", blkMovingInfo.getTarget());
InetSocketAddress targetAddr = NetUtils.createSocketAddr(
blkMovingInfo.getTarget().getXferAddr(connectToDnViaHostname));
OutputStream unbufOut;
InputStream unbufIn;
int encryptionKeyRetryCount = 0;
while (true) {
try {
NetUtils.connect(sock, targetAddr, socketTimeout);
// Set read timeout so that it doesn't hang forever against
// unresponsive nodes. Datanode normally sends IN_PROGRESS response
// twice within the client read timeout period (every 30 seconds by
// default). Here, we make it give up after "socketTimeout * 5"
// period of no response.
sock.setSoTimeout(socketTimeout * 5);
sock.setKeepAlive(true);
unbufOut = sock.getOutputStream();
unbufIn = sock.getInputStream();
LOG.debug("Connecting to datanode {}", blkMovingInfo.getTarget());

IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
unbufIn, km, accessToken, blkMovingInfo.getTarget());
unbufOut = saslStreams.out;
unbufIn = saslStreams.in;
IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
unbufIn, km, accessToken, blkMovingInfo.getTarget());
unbufOut = saslStreams.out;
unbufIn = saslStreams.in;
break;
} catch (InvalidEncryptionKeyException e) {
IOUtils.closeSocket(sock);
if (++encryptionKeyRetryCount > 1) {
throw e;
}
LOG.info("Retrying connection to {} for block {} after "
+ "InvalidEncryptionKeyException, will wait before retry",
blkMovingInfo.getTarget(), blkMovingInfo.getBlock(), e);
km.clearDataEncryptionKey();
try {
Thread.sleep(socketTimeout);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw e;
}
sock = new Socket();
}
}
out = new DataOutputStream(
new BufferedOutputStream(unbufOut, ioFileBufferSize));
in = new DataInputStream(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
Expand Down Expand Up @@ -3000,7 +3001,8 @@ CHECKSUM_SIZE depends on CHECKSUM_TYPE (usually, 4 for CRC32)
* Used for transferring a block of data. This class
* sends a piece of data to another DataNode.
*/
private class DataTransfer implements Runnable {
@VisibleForTesting
class DataTransfer implements Runnable {
final DatanodeInfo[] targets;
final StorageType[] targetStorageTypes;
final private String[] targetStorageIds;
Expand Down Expand Up @@ -3060,10 +3062,6 @@ public void run() {
final String dnAddr = targets[0].getXferAddr(connectToDnViaHostname);
InetSocketAddress curTarget = NetUtils.createSocketAddr(dnAddr);
LOG.debug("Connecting to datanode {}", dnAddr);
sock = newSocket();
NetUtils.connect(sock, curTarget, dnConf.socketTimeout);
sock.setTcpNoDelay(dnConf.getDataTransferServerTcpNoDelay());
sock.setSoTimeout(targets.length * dnConf.socketTimeout);

//
// Header info
Expand All @@ -3072,17 +3070,46 @@ public void run() {
EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE),
targetStorageTypes, targetStorageIds);

long writeTimeout = dnConf.socketWriteTimeout +
long writeTimeout = dnConf.socketWriteTimeout +
HdfsConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1);
OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
InputStream unbufIn = NetUtils.getInputStream(sock);
DataEncryptionKeyFactory keyFactory =
getDataEncryptionKeyFactoryForBlock(b);
IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
unbufIn, keyFactory, accessToken, bpReg);
unbufOut = saslStreams.out;
unbufIn = saslStreams.in;

OutputStream unbufOut;
InputStream unbufIn;
int encryptionKeyRetryCount = 0;
while (true) {
try {
sock = newSocket();
NetUtils.connect(sock, curTarget, dnConf.socketTimeout);
sock.setTcpNoDelay(dnConf.getDataTransferServerTcpNoDelay());
sock.setSoTimeout(targets.length * dnConf.socketTimeout);

unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
unbufIn = NetUtils.getInputStream(sock);
IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
unbufIn, keyFactory, accessToken, bpReg);
unbufOut = saslStreams.out;
unbufIn = saslStreams.in;
break;
} catch (InvalidEncryptionKeyException e) {
IOUtils.closeSocket(sock);
sock = null;
if (++encryptionKeyRetryCount > 1) {
throw e;
}
LOG.info("Retrying connection to {} for block {} after "
+ "InvalidEncryptionKeyException, will wait before retry",
curTarget, b, e);
keyFactory.clearDataEncryptionKey();
try {
Thread.sleep(dnConf.socketTimeout);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw e;
}
}
}

out = new DataOutputStream(new BufferedOutputStream(unbufOut,
DFSUtilClient.getSmallBufferSize(getConf())));
in = new DataInputStream(unbufIn);
Expand Down
Loading
Loading