-
Notifications
You must be signed in to change notification settings - Fork 8.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HDFS-17453. IncrementalBlockReport can have race condition with Edit Log Tailer #6708
Merged
Merged
Changes from 5 commits
Commits
Show all changes
12 commits
Select commit
Hold shift + click to select a range
e9c7350
Fix race condition between IBR and Edit tailing on Standby
acdabfb
Add Unit Test and fix removal condition
bc2aead
Add comment
79d91cb
Add Unit Test
d637a41
Fix unit tests
4ec0e6c
Add better null handling
9ff9a64
Fix unit test
3ba3098
Fix checkstyle
91a7973
Fix TestDNFencing tests
181bccb
Remove unnecessary line
599d98d
Fix unit test
7e36db7
Fix unit tests
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,14 +17,25 @@ | |
*/ | ||
package org.apache.hadoop.hdfs.server.datanode; | ||
|
||
import static org.junit.Assert.assertEquals; | ||
import static org.junit.Assert.assertFalse; | ||
import static org.mockito.ArgumentMatchers.any; | ||
import static org.mockito.ArgumentMatchers.anyString; | ||
import static org.mockito.Mockito.atLeastOnce; | ||
import static org.mockito.Mockito.doAnswer; | ||
import static org.mockito.Mockito.times; | ||
|
||
import java.io.IOException; | ||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.concurrent.Phaser; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
import org.apache.hadoop.fs.FileSystem; | ||
import org.apache.hadoop.fs.Path; | ||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; | ||
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; | ||
import org.mockito.invocation.InvocationOnMock; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
import org.apache.hadoop.conf.Configuration; | ||
|
@@ -55,6 +66,9 @@ public class TestIncrementalBlockReports { | |
private static final long DUMMY_BLOCK_ID = 5678; | ||
private static final long DUMMY_BLOCK_LENGTH = 1024 * 1024; | ||
private static final long DUMMY_BLOCK_GENSTAMP = 1000; | ||
private static final String TEST_FILE_DATA = "hello world"; | ||
private static final String TEST_FILE = "/TestStandbyBlockManagement"; | ||
private static final Path TEST_FILE_PATH = new Path(TEST_FILE); | ||
|
||
private MiniDFSCluster cluster = null; | ||
private Configuration conf; | ||
|
@@ -215,4 +229,95 @@ public void testReplaceReceivedBlock() throws InterruptedException, IOException | |
cluster = null; | ||
} | ||
} | ||
|
||
@Test | ||
public void testIBRRaceCondition() throws Exception { | ||
cluster.shutdown(); | ||
Configuration conf = new Configuration(); | ||
HAUtil.setAllowStandbyReads(conf, true); | ||
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); | ||
cluster = new MiniDFSCluster.Builder(conf) | ||
.nnTopology(MiniDFSNNTopology.simpleHATopology()) | ||
.numDataNodes(3) | ||
.build(); | ||
try { | ||
cluster.waitActive(); | ||
cluster.transitionToActive(0); | ||
|
||
NameNode nn1 = cluster.getNameNode(0); | ||
NameNode nn2 = cluster.getNameNode(1); | ||
FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf); | ||
List<InvocationOnMock> ibrsToStandby = new ArrayList<>(); | ||
List<DatanodeProtocolClientSideTranslatorPB> spies = new ArrayList<>(); | ||
Phaser ibrPhaser = new Phaser(1); | ||
for (DataNode dn : cluster.getDataNodes()) { | ||
DatanodeProtocolClientSideTranslatorPB nnSpy = | ||
InternalDataNodeTestUtils.spyOnBposToNN(dn, nn2); | ||
doAnswer((inv) -> { | ||
for (StorageReceivedDeletedBlocks srdb : | ||
inv.getArgument(2, StorageReceivedDeletedBlocks[].class)) { | ||
for (ReceivedDeletedBlockInfo block : srdb.getBlocks()) { | ||
if (block.getStatus().equals(BlockStatus.RECEIVED_BLOCK)) { | ||
ibrPhaser.arriveAndDeregister(); | ||
} | ||
} | ||
} | ||
ibrsToStandby.add(inv); | ||
return null; | ||
}).when(nnSpy).blockReceivedAndDeleted( | ||
any(DatanodeRegistration.class), | ||
anyString(), | ||
any(StorageReceivedDeletedBlocks[].class)); | ||
spies.add(nnSpy); | ||
} | ||
|
||
Thread.sleep(1000); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we do better than sleep? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. waitFor() something is better. |
||
LOG.info("=================================="); | ||
ibrPhaser.bulkRegister(9); | ||
DFSTestUtil.writeFile(fs, TEST_FILE_PATH, TEST_FILE_DATA); | ||
DFSTestUtil.appendFile(fs, TEST_FILE_PATH, TEST_FILE_DATA); | ||
DFSTestUtil.appendFile(fs, TEST_FILE_PATH, TEST_FILE_DATA); | ||
HATestUtil.waitForStandbyToCatchUp(nn1, nn2); | ||
int phase = ibrPhaser.arrive(); | ||
ibrPhaser.awaitAdvanceInterruptibly(phase, 60, TimeUnit.SECONDS); | ||
for (InvocationOnMock sendIBRs : ibrsToStandby) { | ||
try { | ||
sendIBRs.callRealMethod(); | ||
} catch (Throwable t) { | ||
LOG.error("Exception thrown while calling sendIBRs: ", t); | ||
} | ||
} | ||
|
||
assertEquals("There should be 3 pending messages from DNs", 3, | ||
nn2.getNamesystem().getBlockManager().getPendingDataNodeMessageCount()); | ||
ibrsToStandby.clear(); | ||
ibrPhaser.bulkRegister(6); | ||
DFSTestUtil.appendFile(fs, TEST_FILE_PATH, TEST_FILE_DATA); | ||
DFSTestUtil.appendFile(fs, TEST_FILE_PATH, TEST_FILE_DATA); | ||
phase = ibrPhaser.arrive(); | ||
ibrPhaser.awaitAdvanceInterruptibly(phase, 60, TimeUnit.SECONDS); | ||
for (InvocationOnMock sendIBRs : ibrsToStandby) { | ||
try { | ||
sendIBRs.callRealMethod(); | ||
} catch (Throwable t) { | ||
LOG.error("Exception thrown while calling sendIBRs: ", t); | ||
} | ||
} | ||
ibrsToStandby.clear(); | ||
ibrPhaser.arriveAndDeregister(); | ||
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, TEST_FILE_PATH); | ||
HATestUtil.waitForStandbyToCatchUp(nn1, nn2); | ||
LOG.info("=================================="); | ||
|
||
cluster.transitionToStandby(0); | ||
cluster.transitionToActive(1); | ||
cluster.waitActive(1); | ||
|
||
assertEquals("There should not be any corrupt replicas", 0, | ||
nn2.getNamesystem().getBlockManager() | ||
.numCorruptReplicas(block.getLocalBlock())); | ||
} finally { | ||
cluster.shutdown(); | ||
} | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could make this more robus to nulls with:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added these null checks, but I needed to remove the reportedState null check because the null value is used by removeStoredBlock