Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDFS-17453. IncrementalBlockReport can have race condition with Edit Log Tailer #6708

Merged
merged 12 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,16 +95,27 @@ void removeAllMessagesForDatanode(DatanodeDescriptor dn) {

void enqueueReportedBlock(DatanodeStorageInfo storageInfo, Block block,
ReplicaState reportedState) {
long genStamp = block.getGenerationStamp();
Queue<ReportedBlockInfo> queue = null;
if (BlockIdManager.isStripedBlockID(block.getBlockId())) {
Block blkId = new Block(BlockIdManager.convertToStripedID(block
.getBlockId()));
getBlockQueue(blkId).add(
new ReportedBlockInfo(storageInfo, new Block(block), reportedState));
queue = getBlockQueue(blkId);
} else {
block = new Block(block);
getBlockQueue(block).add(
new ReportedBlockInfo(storageInfo, block, reportedState));
queue = getBlockQueue(block);
}
// We only want the latest non-future reported block to be queued for each
// DataNode. Otherwise, there can be a race condition that causes an old
// reported block to be kept in the queue until the SNN switches to ANN and
// the old reported block will be processed and marked as corrupt by the ANN.
// See HDFS-17453
int size = queue.size();
if (queue.removeIf(rbi -> rbi.storageInfo.equals(storageInfo) &&
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could make this more robus to nulls with:

  void enqueueReportedBlock(DatanodeStorageInfo storageInfo, Block block,
      ReplicaState reportedState) {
    if (storageInfo == null || block == null || reportedState == null) {
        return;
    }
    ...
    if (queue.removeIf(rbi -> storageInfo.equals(rbi.storageInfo) &&
    ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added these null checks, but I needed to remove the reportedState null check because the null value is used by removeStoredBlock

rbi.block.getGenerationStamp() < genStamp)) {
count -= (size - queue.size());
}
queue.add(new ReportedBlockInfo(storageInfo, new Block(block), reportedState));
count++;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
Expand All @@ -52,11 +53,15 @@ public class TestPendingDataNodeMessages {

@Test
public void testQueues() {
DatanodeDescriptor fakeDN = DFSTestUtil.getLocalDatanodeDescriptor();
DatanodeDescriptor fakeDN1 = DFSTestUtil.getDatanodeDescriptor("localhost", 8898, "/default-rack");
DatanodeDescriptor fakeDN2 = DFSTestUtil.getDatanodeDescriptor("localhost", 8899, "/default-rack");
DatanodeStorage storage = new DatanodeStorage("STORAGE_ID");
DatanodeStorageInfo storageInfo = new DatanodeStorageInfo(fakeDN, storage);
msgs.enqueueReportedBlock(storageInfo, block1Gs1, ReplicaState.FINALIZED);
msgs.enqueueReportedBlock(storageInfo, block1Gs2, ReplicaState.FINALIZED);
DatanodeStorageInfo storageInfo1 = new DatanodeStorageInfo(fakeDN1, storage);
DatanodeStorageInfo storageInfo2 = new DatanodeStorageInfo(fakeDN2, storage);
msgs.enqueueReportedBlock(storageInfo1, block1Gs1, ReplicaState.FINALIZED);
msgs.enqueueReportedBlock(storageInfo2, block1Gs1, ReplicaState.FINALIZED);
msgs.enqueueReportedBlock(storageInfo1, block1Gs2, ReplicaState.FINALIZED);
msgs.enqueueReportedBlock(storageInfo2, block1Gs2, ReplicaState.FINALIZED);

assertEquals(2, msgs.count());

Expand All @@ -67,8 +72,8 @@ public void testQueues() {
Queue<ReportedBlockInfo> q =
msgs.takeBlockQueue(block1Gs2DifferentInstance);
assertEquals(
"ReportedBlockInfo [block=blk_1_1, dn=127.0.0.1:9866, reportedState=FINALIZED]," +
"ReportedBlockInfo [block=blk_1_2, dn=127.0.0.1:9866, reportedState=FINALIZED]",
"ReportedBlockInfo [block=blk_1_2, dn=/default-rack/localhost:8898, reportedState=FINALIZED]," +
"ReportedBlockInfo [block=blk_1_2, dn=/default-rack/localhost:8899, reportedState=FINALIZED]",
Joiner.on(",").join(q));
assertEquals(0, msgs.count());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,25 @@
*/
package org.apache.hadoop.hdfs.server.datanode;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.times;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.mockito.invocation.InvocationOnMock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -55,6 +66,9 @@ public class TestIncrementalBlockReports {
private static final long DUMMY_BLOCK_ID = 5678;
private static final long DUMMY_BLOCK_LENGTH = 1024 * 1024;
private static final long DUMMY_BLOCK_GENSTAMP = 1000;
private static final String TEST_FILE_DATA = "hello world";
private static final String TEST_FILE = "/TestStandbyBlockManagement";
private static final Path TEST_FILE_PATH = new Path(TEST_FILE);

private MiniDFSCluster cluster = null;
private Configuration conf;
Expand Down Expand Up @@ -215,4 +229,95 @@ public void testReplaceReceivedBlock() throws InterruptedException, IOException
cluster = null;
}
}

@Test
public void testIBRRaceCondition() throws Exception {
cluster.shutdown();
Configuration conf = new Configuration();
HAUtil.setAllowStandbyReads(conf, true);
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(3)
.build();
try {
cluster.waitActive();
cluster.transitionToActive(0);

NameNode nn1 = cluster.getNameNode(0);
NameNode nn2 = cluster.getNameNode(1);
FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
List<InvocationOnMock> ibrsToStandby = new ArrayList<>();
List<DatanodeProtocolClientSideTranslatorPB> spies = new ArrayList<>();
Phaser ibrPhaser = new Phaser(1);
for (DataNode dn : cluster.getDataNodes()) {
DatanodeProtocolClientSideTranslatorPB nnSpy =
InternalDataNodeTestUtils.spyOnBposToNN(dn, nn2);
doAnswer((inv) -> {
for (StorageReceivedDeletedBlocks srdb :
inv.getArgument(2, StorageReceivedDeletedBlocks[].class)) {
for (ReceivedDeletedBlockInfo block : srdb.getBlocks()) {
if (block.getStatus().equals(BlockStatus.RECEIVED_BLOCK)) {
ibrPhaser.arriveAndDeregister();
}
}
}
ibrsToStandby.add(inv);
return null;
}).when(nnSpy).blockReceivedAndDeleted(
any(DatanodeRegistration.class),
anyString(),
any(StorageReceivedDeletedBlocks[].class));
spies.add(nnSpy);
}

Thread.sleep(1000);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do better than sleep?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

waitFor() something is better.

LOG.info("==================================");
ibrPhaser.bulkRegister(9);
DFSTestUtil.writeFile(fs, TEST_FILE_PATH, TEST_FILE_DATA);
DFSTestUtil.appendFile(fs, TEST_FILE_PATH, TEST_FILE_DATA);
DFSTestUtil.appendFile(fs, TEST_FILE_PATH, TEST_FILE_DATA);
HATestUtil.waitForStandbyToCatchUp(nn1, nn2);
int phase = ibrPhaser.arrive();
ibrPhaser.awaitAdvanceInterruptibly(phase, 60, TimeUnit.SECONDS);
for (InvocationOnMock sendIBRs : ibrsToStandby) {
try {
sendIBRs.callRealMethod();
} catch (Throwable t) {
LOG.error("Exception thrown while calling sendIBRs: ", t);
}
}

assertEquals("There should be 3 pending messages from DNs", 3,
nn2.getNamesystem().getBlockManager().getPendingDataNodeMessageCount());
ibrsToStandby.clear();
ibrPhaser.bulkRegister(6);
DFSTestUtil.appendFile(fs, TEST_FILE_PATH, TEST_FILE_DATA);
DFSTestUtil.appendFile(fs, TEST_FILE_PATH, TEST_FILE_DATA);
phase = ibrPhaser.arrive();
ibrPhaser.awaitAdvanceInterruptibly(phase, 60, TimeUnit.SECONDS);
for (InvocationOnMock sendIBRs : ibrsToStandby) {
try {
sendIBRs.callRealMethod();
} catch (Throwable t) {
LOG.error("Exception thrown while calling sendIBRs: ", t);
}
}
ibrsToStandby.clear();
ibrPhaser.arriveAndDeregister();
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, TEST_FILE_PATH);
HATestUtil.waitForStandbyToCatchUp(nn1, nn2);
LOG.info("==================================");

cluster.transitionToStandby(0);
cluster.transitionToActive(1);
cluster.waitActive(1);

assertEquals("There should not be any corrupt replicas", 0,
nn2.getNamesystem().getBlockManager()
.numCorruptReplicas(block.getLocalBlock()));
} finally {
cluster.shutdown();
}
}
}
Loading