From 0fde26eb8503805d9cdbf2aa9e41f30d5f109f35 Mon Sep 17 00:00:00 2001 From: ZhenyuLi <893652269@qq.com> Date: Thu, 26 Mar 2026 10:20:43 -0400 Subject: [PATCH] HDFS-17899. Handle InvalidEncryptionKeyException in Balancer Dispatcher, SPS BlockDispatcher and DataNode DataTransfer --- .../sasl/DataEncryptionKeyFactory.java | 10 ++ .../hdfs/server/balancer/Dispatcher.java | 67 ++++++--- .../hdfs/server/balancer/KeyManager.java | 10 ++ .../server/common/sps/BlockDispatcher.java | 61 +++++--- .../hadoop/hdfs/server/datanode/DataNode.java | 53 +++++-- .../balancer/TestDispatcherEncryptionKey.java | 137 ++++++++++++++++++ .../hdfs/server/balancer/TestKeyManager.java | 48 ++++++ .../common/sps/TestBlockDispatcher.java | 95 ++++++++++++ .../TestDataTransferEncryptionKey.java | 136 +++++++++++++++++ 9 files changed, 566 insertions(+), 51 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestDispatcherEncryptionKey.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/sps/TestBlockDispatcher.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataTransferEncryptionKey.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataEncryptionKeyFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataEncryptionKeyFactory.java index 959cba0fb48d6..a1674232fd78e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataEncryptionKeyFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataEncryptionKeyFactory.java @@ -35,4 +35,14 @@ public interface DataEncryptionKeyFactory { * @throws IOException for any error */ DataEncryptionKey newDataEncryptionKey() throws IOException; + + /** + * Clear the cached data encryption key, so that a new key will be + * generated on the next call to {@link #newDataEncryptionKey()}. + * This is called when an InvalidEncryptionKeyException is received + * to force a key refresh on retry. + */ + default void clearDataEncryptionKey() { + // no-op by default + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index acac65d774505..849327e15c541 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@ -63,6 +63,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.BlockPinningException; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; +import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil; @@ -358,7 +359,8 @@ private boolean addTo(StorageGroup g) { } /** Dispatch the move to the proxy source & wait for the response. */ - private void dispatch() { + @VisibleForTesting + void dispatch() { Socket sock = new Socket(); DataOutputStream out = null; DataInputStream in = null; @@ -372,29 +374,54 @@ private void dispatch() { LOG.info("Start moving " + this); assert !(reportedBlock instanceof DBlockStriped); - sock.connect( - NetUtils.createSocketAddr(target.getDatanodeInfo(). - getXferAddr(Dispatcher.this.connectToDnViaHostname)), - HdfsConstants.READ_TIMEOUT); - - // Set read timeout so that it doesn't hang forever against - // unresponsive nodes. Datanode normally sends IN_PROGRESS response - // twice within the client read timeout period (every 30 seconds by - // default). Here, we make it give up after 5 minutes of no response. - sock.setSoTimeout(HdfsConstants.READ_TIMEOUT * 5); - sock.setKeepAlive(true); - - OutputStream unbufOut = sock.getOutputStream(); - InputStream unbufIn = sock.getInputStream(); ExtendedBlock eb = new ExtendedBlock(nnc.getBlockpoolID(), reportedBlock.getBlock()); - final KeyManager km = nnc.getKeyManager(); + final KeyManager km = nnc.getKeyManager(); Token accessToken = km.getAccessToken(eb, new StorageType[]{target.storageType}, new String[0]); - IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut, - unbufIn, km, accessToken, target.getDatanodeInfo()); - unbufOut = saslStreams.out; - unbufIn = saslStreams.in; + OutputStream unbufOut; + InputStream unbufIn; + int encryptionKeyRetryCount = 0; + while (true) { + try { + sock.connect( + NetUtils.createSocketAddr(target.getDatanodeInfo(). + getXferAddr(Dispatcher.this.connectToDnViaHostname)), + HdfsConstants.READ_TIMEOUT); + + // Set read timeout so that it doesn't hang forever against + // unresponsive nodes. Datanode normally sends IN_PROGRESS + // response twice within the client read timeout period (every + // 30 seconds by default). Here, we make it give up after 5 + // minutes of no response. + sock.setSoTimeout(HdfsConstants.READ_TIMEOUT * 5); + sock.setKeepAlive(true); + + unbufOut = sock.getOutputStream(); + unbufIn = sock.getInputStream(); + IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut, + unbufIn, km, accessToken, target.getDatanodeInfo()); + unbufOut = saslStreams.out; + unbufIn = saslStreams.in; + break; + } catch (InvalidEncryptionKeyException e) { + IOUtils.closeSocket(sock); + if (++encryptionKeyRetryCount > 1) { + throw e; + } + LOG.info("Retrying connection to {} for block {} after " + + "InvalidEncryptionKeyException, will wait before retry", + target.getDatanodeInfo(), reportedBlock.getBlock(), e); + km.clearDataEncryptionKey(); + try { + Thread.sleep(HdfsConstants.READ_TIMEOUT); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw e; + } + sock = new Socket(); + } + } out = new DataOutputStream(new BufferedOutputStream(unbufOut, ioFileBufferSize)); in = new DataInputStream(new BufferedInputStream(unbufIn, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java index 5644ef7d7da2f..74560e96a68f2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java @@ -148,6 +148,16 @@ public DataEncryptionKey newDataEncryptionKey() { } } + /** + * Clear the cached data encryption key, so that a new key will be generated + * on the next call to {@link #newDataEncryptionKey()}. + */ + @Override + public synchronized void clearDataEncryptionKey() { + LOG.debug("Clearing data encryption key"); + encryptionKey = null; + } + @Override public void close() { shouldRun = false; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockDispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockDispatcher.java index f7756c74851a6..d0875e9cb07a8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockDispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockDispatcher.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.net.InetSocketAddress; import java.net.Socket; import org.apache.hadoop.classification.InterfaceAudience; @@ -37,6 +38,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.BlockPinningException; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; +import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; @@ -110,25 +112,48 @@ public BlockMovementStatus moveBlock(BlockMovingInfo blkMovingInfo, DataOutputStream out = null; DataInputStream in = null; try { - NetUtils.connect(sock, - NetUtils.createSocketAddr( - blkMovingInfo.getTarget().getXferAddr(connectToDnViaHostname)), - socketTimeout); - // Set read timeout so that it doesn't hang forever against - // unresponsive nodes. Datanode normally sends IN_PROGRESS response - // twice within the client read timeout period (every 30 seconds by - // default). Here, we make it give up after "socketTimeout * 5" period - // of no response. - sock.setSoTimeout(socketTimeout * 5); - sock.setKeepAlive(true); - OutputStream unbufOut = sock.getOutputStream(); - InputStream unbufIn = sock.getInputStream(); - LOG.debug("Connecting to datanode {}", blkMovingInfo.getTarget()); + InetSocketAddress targetAddr = NetUtils.createSocketAddr( + blkMovingInfo.getTarget().getXferAddr(connectToDnViaHostname)); + OutputStream unbufOut; + InputStream unbufIn; + int encryptionKeyRetryCount = 0; + while (true) { + try { + NetUtils.connect(sock, targetAddr, socketTimeout); + // Set read timeout so that it doesn't hang forever against + // unresponsive nodes. Datanode normally sends IN_PROGRESS response + // twice within the client read timeout period (every 30 seconds by + // default). Here, we make it give up after "socketTimeout * 5" + // period of no response. + sock.setSoTimeout(socketTimeout * 5); + sock.setKeepAlive(true); + unbufOut = sock.getOutputStream(); + unbufIn = sock.getInputStream(); + LOG.debug("Connecting to datanode {}", blkMovingInfo.getTarget()); - IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut, - unbufIn, km, accessToken, blkMovingInfo.getTarget()); - unbufOut = saslStreams.out; - unbufIn = saslStreams.in; + IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut, + unbufIn, km, accessToken, blkMovingInfo.getTarget()); + unbufOut = saslStreams.out; + unbufIn = saslStreams.in; + break; + } catch (InvalidEncryptionKeyException e) { + IOUtils.closeSocket(sock); + if (++encryptionKeyRetryCount > 1) { + throw e; + } + LOG.info("Retrying connection to {} for block {} after " + + "InvalidEncryptionKeyException, will wait before retry", + blkMovingInfo.getTarget(), blkMovingInfo.getBlock(), e); + km.clearDataEncryptionKey(); + try { + Thread.sleep(socketTimeout); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw e; + } + sock = new Socket(); + } + } out = new DataOutputStream( new BufferedOutputStream(unbufOut, ioFileBufferSize)); in = new DataInputStream( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 3a1b1e07f3682..6d6a629c012d1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -192,6 +192,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; +import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory; @@ -3000,7 +3001,8 @@ CHECKSUM_SIZE depends on CHECKSUM_TYPE (usually, 4 for CRC32) * Used for transferring a block of data. This class * sends a piece of data to another DataNode. */ - private class DataTransfer implements Runnable { + @VisibleForTesting + class DataTransfer implements Runnable { final DatanodeInfo[] targets; final StorageType[] targetStorageTypes; final private String[] targetStorageIds; @@ -3060,10 +3062,6 @@ public void run() { final String dnAddr = targets[0].getXferAddr(connectToDnViaHostname); InetSocketAddress curTarget = NetUtils.createSocketAddr(dnAddr); LOG.debug("Connecting to datanode {}", dnAddr); - sock = newSocket(); - NetUtils.connect(sock, curTarget, dnConf.socketTimeout); - sock.setTcpNoDelay(dnConf.getDataTransferServerTcpNoDelay()); - sock.setSoTimeout(targets.length * dnConf.socketTimeout); // // Header info @@ -3072,17 +3070,46 @@ public void run() { EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE), targetStorageTypes, targetStorageIds); - long writeTimeout = dnConf.socketWriteTimeout + + long writeTimeout = dnConf.socketWriteTimeout + HdfsConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1); - OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout); - InputStream unbufIn = NetUtils.getInputStream(sock); DataEncryptionKeyFactory keyFactory = getDataEncryptionKeyFactoryForBlock(b); - IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut, - unbufIn, keyFactory, accessToken, bpReg); - unbufOut = saslStreams.out; - unbufIn = saslStreams.in; - + OutputStream unbufOut; + InputStream unbufIn; + int encryptionKeyRetryCount = 0; + while (true) { + try { + sock = newSocket(); + NetUtils.connect(sock, curTarget, dnConf.socketTimeout); + sock.setTcpNoDelay(dnConf.getDataTransferServerTcpNoDelay()); + sock.setSoTimeout(targets.length * dnConf.socketTimeout); + + unbufOut = NetUtils.getOutputStream(sock, writeTimeout); + unbufIn = NetUtils.getInputStream(sock); + IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut, + unbufIn, keyFactory, accessToken, bpReg); + unbufOut = saslStreams.out; + unbufIn = saslStreams.in; + break; + } catch (InvalidEncryptionKeyException e) { + IOUtils.closeSocket(sock); + sock = null; + if (++encryptionKeyRetryCount > 1) { + throw e; + } + LOG.info("Retrying connection to {} for block {} after " + + "InvalidEncryptionKeyException, will wait before retry", + curTarget, b, e); + keyFactory.clearDataEncryptionKey(); + try { + Thread.sleep(dnConf.socketTimeout); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw e; + } + } + } + out = new DataOutputStream(new BufferedOutputStream(unbufOut, DFSUtilClient.getSmallBufferSize(getConf()))); in = new DataInputStream(unbufIn); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestDispatcherEncryptionKey.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestDispatcherEncryptionKey.java new file mode 100644 index 0000000000000..ba7abb1edd59d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestDispatcherEncryptionKey.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.balancer; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.lang.reflect.Constructor; +import java.net.ServerSocket; +import java.util.Collections; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.test.Whitebox; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +/** + * Test Dispatcher handling of InvalidEncryptionKeyException. + */ +@Timeout(120) +public class TestDispatcherEncryptionKey { + + /** + * Verify that when InvalidEncryptionKeyException is encountered during + * block dispatch, the dispatcher clears the cached encryption key + * before retry. + * + * Uses Thread.currentThread().interrupt() in the mock to avoid the + * 60-second sleep in the retry path, making the test fast. + */ + @Test + public void testClearEncryptionKeyOnRetry() throws Exception { + try (ServerSocket serverSocket = new ServerSocket(0)) { + int port = serverSocket.getLocalPort(); + + Configuration conf = new HdfsConfiguration(); + + // Mock NameNodeConnector + NameNodeConnector nnc = mock(NameNodeConnector.class); + when(nnc.getBlockpoolID()).thenReturn("bp-test"); + when(nnc.getBlocksFailed()).thenReturn(new AtomicLong(0)); + + // Mock KeyManager + KeyManager km = mock(KeyManager.class); + when(km.getAccessToken(any(), any(), any())) + .thenReturn(new Token()); + when(nnc.getKeyManager()).thenReturn(km); + + // Create Dispatcher with minimal config + Dispatcher dispatcher = new Dispatcher(nnc, Collections.emptySet(), + Collections.emptySet(), 1, 1, 0, + 1, 1, conf); + + // Inject mock saslClient that throws InvalidEncryptionKeyException + // and interrupts the current thread to skip the 60s sleep + SaslDataTransferClient saslClient = + mock(SaslDataTransferClient.class); + when(saslClient.socketSend( + any(), any(), any(), any(), any(), any(DatanodeInfo.class))) + .thenAnswer(invocation -> { + Thread.currentThread().interrupt(); + throw new InvalidEncryptionKeyException( + "test: encryption key expired"); + }); + Whitebox.setInternalState(dispatcher, "saslClient", saslClient); + + // Create real DDatanode via reflection (private constructor) + DatanodeInfo targetDN = mock(DatanodeInfo.class); + when(targetDN.getXferAddr(anyBoolean())) + .thenReturn("127.0.0.1:" + port); + Constructor ddnCtor = + Dispatcher.DDatanode.class.getDeclaredConstructor( + DatanodeInfo.class, int.class); + ddnCtor.setAccessible(true); + Dispatcher.DDatanode targetDdn = ddnCtor.newInstance(targetDN, 1); + + // Create real StorageGroup via addTarget (handles private ctor) + Dispatcher.DDatanode.StorageGroup target = + targetDdn.addTarget(StorageType.DISK, 1024L); + + // Mock Source - only need isIterationOver() + Dispatcher.Source source = mock(Dispatcher.Source.class); + when(source.isIterationOver()).thenReturn(false); + + // Create real DDatanode for proxySource + Dispatcher.DDatanode proxyDdn = + ddnCtor.newInstance(mock(DatanodeInfo.class), 1); + + // Create PendingMove and set its fields + Dispatcher.PendingMove move = + dispatcher.new PendingMove(source, target); + Dispatcher.DBlock reportedBlock = mock(Dispatcher.DBlock.class); + when(reportedBlock.getBlock()) + .thenReturn(new Block(1, 100, 1001)); + Whitebox.setInternalState(move, "reportedBlock", reportedBlock); + Whitebox.setInternalState(move, "proxySource", proxyDdn); + + // Call dispatch - it handles exceptions internally + move.dispatch(); + + // Clear the interrupt flag left by our mock + Thread.interrupted(); + + // Verify clearDataEncryptionKey was called exactly once. + // Without the fix, this verify will FAIL. + verify(km, times(1)).clearDataEncryptionKey(); + } + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestKeyManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestKeyManager.java index 3d60a73510b79..a899200bd66a4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestKeyManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestKeyManager.java @@ -29,6 +29,9 @@ import org.junit.jupiter.api.Timeout; import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNotSame; +import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.mock; @@ -82,4 +85,49 @@ public void testNewDataEncryptionKey() throws Exception { assertTrue(dekAfterExpiration.expiryDate > fakeTimer.now(), "KeyManager has an expired DataEncryptionKey!"); } + + @Test + public void testClearDataEncryptionKey() throws Exception { + final Configuration conf = new HdfsConfiguration(); + conf.setBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY, true); + conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true); + + final long keyUpdateInterval = 2 * 1000; + final long tokenLifeTime = keyUpdateInterval; + final String blockPoolId = "bp-foo"; + FakeTimer fakeTimer = new FakeTimer(); + BlockTokenSecretManager btsm = new BlockTokenSecretManager( + keyUpdateInterval, tokenLifeTime, 0, 1, blockPoolId, null, false); + Whitebox.setInternalState(btsm, "timer", fakeTimer); + + NamenodeProtocol namenode = mock(NamenodeProtocol.class); + when(namenode.getBlockKeys()).thenReturn(btsm.exportKeys()); + + KeyManager keyManager = new KeyManager(blockPoolId, namenode, + true, conf); + Whitebox.setInternalState(keyManager, "timer", fakeTimer); + Whitebox.setInternalState( + Whitebox.getInternalState(keyManager, "blockTokenSecretManager"), + "timer", fakeTimer); + + // Get initial encryption key + final DataEncryptionKey dek1 = keyManager.newDataEncryptionKey(); + assertNotNull(dek1, "Encryption key should not be null"); + + // Same cached key should be returned when not expired + final DataEncryptionKey dek1Again = keyManager.newDataEncryptionKey(); + assertSame(dek1, dek1Again, + "Should return cached key when not expired"); + + // Clear the cached encryption key + keyManager.clearDataEncryptionKey(); + + // After clearing, a new key should be generated + final DataEncryptionKey dek2 = keyManager.newDataEncryptionKey(); + assertNotNull(dek2, "New encryption key should not be null"); + assertNotSame(dek1, dek2, + "Should generate a new key after clearing cached key"); + assertTrue(dek2.expiryDate > fakeTimer.now(), + "New encryption key should not be expired"); + } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/sps/TestBlockDispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/sps/TestBlockDispatcher.java new file mode 100644 index 0000000000000..2fae0c6d6cb37 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/sps/TestBlockDispatcher.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.common.sps; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.net.ServerSocket; +import java.net.Socket; + +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; +import org.apache.hadoop.security.token.Token; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +/** + * Test BlockDispatcher class. + */ +@Timeout(60) +public class TestBlockDispatcher { + + /** + * Verify that when InvalidEncryptionKeyException is encountered during + * block move, the dispatcher clears the cached encryption key before retry. + * + * This test will FAIL if the clearDataEncryptionKey() call is removed + * from the retry path. + */ + @Test + public void testClearEncryptionKeyOnRetry() throws Exception { + try (ServerSocket serverSocket = new ServerSocket(0)) { + int port = serverSocket.getLocalPort(); + + DatanodeInfo target = mock(DatanodeInfo.class); + when(target.getXferAddr(false)) + .thenReturn("127.0.0.1:" + port); + + DatanodeInfo source = mock(DatanodeInfo.class); + when(source.getDatanodeUuid()).thenReturn("uuid-src"); + + BlockMovingInfo blkMovingInfo = new BlockMovingInfo( + new Block(1, 100, 1001), + source, target, + StorageType.DISK, StorageType.ARCHIVE); + + // Mock saslClient to always throw InvalidEncryptionKeyException + SaslDataTransferClient saslClient = mock(SaslDataTransferClient.class); + when(saslClient.socketSend(any(), any(), any(), any(), any(), any())) + .thenThrow(new InvalidEncryptionKeyException( + "test: encryption key expired")); + + DataEncryptionKeyFactory km = mock(DataEncryptionKeyFactory.class); + ExtendedBlock eb = new ExtendedBlock("bp-1", 1, 100, 1001); + Token accessToken = new Token<>(); + + // Use small socketTimeout (100ms) to keep test fast + BlockDispatcher dispatcher = new BlockDispatcher(100, 1024, false); + + assertThrows(InvalidEncryptionKeyException.class, + () -> dispatcher.moveBlock(blkMovingInfo, saslClient, eb, + new Socket(), km, accessToken)); + + // Verify clearDataEncryptionKey was called exactly once during retry. + // Without the fix, this verify will FAIL. + verify(km, times(1)).clearDataEncryptionKey(); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataTransferEncryptionKey.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataTransferEncryptionKey.java new file mode 100644 index 0000000000000..081085af60cca --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataTransferEncryptionKey.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.net.ServerSocket; +import java.net.Socket; + +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; +import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.test.Whitebox; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +/** + * Test DataNode.DataTransfer handling of InvalidEncryptionKeyException. + */ +@Timeout(60) +public class TestDataTransferEncryptionKey { + + /** + * Verify that when InvalidEncryptionKeyException is encountered during + * DataTransfer, the DataNode clears the cached encryption key before retry. + * + * This test will FAIL if the clearDataEncryptionKey() call is removed + * from the retry path. + */ + @Test + public void testClearEncryptionKeyOnRetry() throws Exception { + try (ServerSocket serverSocket = new ServerSocket(0)) { + int port = serverSocket.getLocalPort(); + + // Create mock DataNode - DataTransfer is a non-static inner class + DataNode mockDn = mock(DataNode.class); + + // Set up blockPoolManager field for constructor + BlockPoolManager mockBpm = mock(BlockPoolManager.class); + BPOfferService mockBpos = mock(BPOfferService.class); + DatanodeRegistration mockReg = mock(DatanodeRegistration.class); + Whitebox.setInternalState(mockBpos, "bpRegistration", mockReg); + when(mockBpm.get(any())).thenReturn(mockBpos); + Whitebox.setInternalState(mockDn, "blockPoolManager", mockBpm); + + // Set up DNConf with small socket timeout for fast test + DNConf mockDnConf = mock(DNConf.class); + Whitebox.setInternalState(mockDnConf, "socketTimeout", 100); + Whitebox.setInternalState(mockDnConf, "socketWriteTimeout", 100); + Whitebox.setInternalState(mockDnConf, "readaheadLength", 0L); + when(mockDnConf.getDataTransferServerTcpNoDelay()).thenReturn(true); + when(mockDn.getDnConf()).thenReturn(mockDnConf); + Whitebox.setInternalState(mockDn, "dnConf", mockDnConf); + + // Set up xserver for transfer throttler + DataXceiverServer mockXserver = mock(DataXceiverServer.class); + when(mockXserver.getTransferThrottler()).thenReturn(null); + Whitebox.setInternalState(mockDn, "xserver", mockXserver); + + // Set connectToDnViaHostname field + Whitebox.setInternalState(mockDn, "connectToDnViaHostname", false); + + // Set up saslClient mock that throws InvalidEncryptionKeyException + SaslDataTransferClient saslClient = + mock(SaslDataTransferClient.class); + when(saslClient.socketSend( + any(), any(), any(), any(), any(), any(DatanodeRegistration.class))) + .thenThrow(new InvalidEncryptionKeyException( + "test: encryption key expired")); + Whitebox.setInternalState(mockDn, "saslClient", saslClient); + + // Stub methods called in run() + doNothing().when(mockDn).incrementXmitsInProgress(); + doNothing().when(mockDn).decrementXmitsInProgress(); + when(mockDn.newSocket()).thenAnswer(inv -> new Socket()); + when(mockDn.getBlockAccessToken(any(), any(), any(), any())) + .thenReturn(new Token()); + + // Mock DataEncryptionKeyFactory to verify clearDataEncryptionKey + DataEncryptionKeyFactory keyFactory = + mock(DataEncryptionKeyFactory.class); + when(mockDn.getDataEncryptionKeyFactoryForBlock(any())) + .thenReturn(keyFactory); + + // Create target datanode info + DatanodeInfo targetDN = mock(DatanodeInfo.class); + when(targetDN.getXferAddr(anyBoolean())) + .thenReturn("127.0.0.1:" + port); + + // Create DataTransfer (package-private inner class) + ExtendedBlock block = new ExtendedBlock("bp-1", 1, 100, 1001); + DataNode.DataTransfer transfer = mockDn.new DataTransfer( + new DatanodeInfo[]{targetDN}, + new StorageType[]{StorageType.DISK}, + new String[]{"storage-1"}, + block, + BlockConstructionStage.PIPELINE_SETUP_CREATE, + ""); + + // Run the transfer - it handles exceptions internally + transfer.run(); + + // Verify clearDataEncryptionKey was called exactly once during retry. + // Without the fix, this verify will FAIL. + verify(keyFactory, times(1)).clearDataEncryptionKey(); + } + } +} \ No newline at end of file