Skip to content

Commit

Permalink
HBASE-25518 Support separate child regions to different region servers (
Browse files Browse the repository at this point in the history
#3001)

Signed-off-by: stack <[email protected]>
  • Loading branch information
sunhelly committed Mar 18, 2021
1 parent d93035a commit 585aca1
Show file tree
Hide file tree
Showing 7 changed files with 313 additions and 66 deletions.
15 changes: 15 additions & 0 deletions hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,21 @@ public enum OperationStatusCode {
/** Default value for the balancer period */
public static final int DEFAULT_HBASE_BALANCER_PERIOD = 300000;

/**
* Config key for enable/disable automatically separate child regions to different region servers
* in the procedure of split regions. One child will be kept to the server where parent
* region is on, and the other child will be assigned to a random server.
* See HBASE-25518.
*/
public static final String HBASE_ENABLE_SEPARATE_CHILD_REGIONS =
"hbase.master.auto.separate.child.regions.after.split.enabled";

/**
* Default value for automatically separate child regions to different region servers
* (set to "false" to keep all child regions to the server where parent region is on)
*/
public static final boolean DEFAULT_HBASE_ENABLE_SEPARATE_CHILD_REGIONS = false;

/** The name of the ensemble table */
public static final TableName ENSEMBLE_TABLE_NAME = TableName.valueOf("hbase:ensemble");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.stream.Stream;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
import org.apache.hadoop.hbase.client.RegionInfo;
Expand All @@ -40,6 +41,7 @@
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_ENABLE_SEPARATE_CHILD_REGIONS;

/**
* Utility for this assignment package only.
Expand Down Expand Up @@ -189,6 +191,67 @@ private static TransitRegionStateProcedure[] createAssignProcedures(MasterProced
return ArrayUtils.addAll(primaryRegionProcs, replicaRegionAssignProcs);
}

/**
* Create round robin assign procedures for the given regions,
* according to the {@code regionReplication}.
* <p/>
* For rolling back, we will submit procedures directly to the {@code ProcedureExecutor}, so it is
* possible that we persist the newly scheduled procedures, and then crash before persisting the
* rollback state, so when we arrive here the second time, it is possible that some regions have
* already been associated with a TRSP.
* @param ignoreIfInTransition if true, will skip creating TRSP for the given region if it is
* already in transition, otherwise we will add an assert that it should not in
* transition.
*/
private static TransitRegionStateProcedure[] createRoundRobinAssignProcedures(
MasterProcedureEnv env, List<RegionInfo> regions, int regionReplication,
List<ServerName> serversToExclude, boolean ignoreIfInTransition) {
List<RegionInfo> regionsAndReplicas = new ArrayList<>(regions);
if (regionReplication != DEFAULT_REGION_REPLICA) {

// collect the replica region infos
List<RegionInfo> replicaRegionInfos =
new ArrayList<RegionInfo>(regions.size() * (regionReplication - 1));
for (RegionInfo hri : regions) {
// start the index from 1
for (int i = 1; i < regionReplication; i++) {
replicaRegionInfos.add(RegionReplicaUtil.getRegionInfoForReplica(hri, i));
}
}
regionsAndReplicas.addAll(replicaRegionInfos);
}
if (ignoreIfInTransition) {
for (RegionInfo region : regionsAndReplicas) {
if (env.getAssignmentManager().getRegionStates().getOrCreateRegionStateNode(region)
.isInTransition()) {
return null;
}
}
}
// create round robin procs. Note that we exclude the primary region's target server
return env.getAssignmentManager()
.createRoundRobinAssignProcedures(regionsAndReplicas, serversToExclude);
}

static TransitRegionStateProcedure[] createAssignProceduresForSplitDaughters(
MasterProcedureEnv env, List<RegionInfo> daughters, int regionReplication,
ServerName parentServer) {
if(env.getMasterConfiguration().getBoolean(HConstants.HBASE_ENABLE_SEPARATE_CHILD_REGIONS,
DEFAULT_HBASE_ENABLE_SEPARATE_CHILD_REGIONS)){
// keep one daughter on the parent region server
TransitRegionStateProcedure[] daughterOne =
createAssignProcedures(env, Collections.singletonList(daughters.get(0)),
regionReplication, parentServer, false);
// round robin assign the other daughter
TransitRegionStateProcedure[] daughterTwo =
createRoundRobinAssignProcedures(env, Collections.singletonList(daughters.get(1)),
regionReplication, Collections.singletonList(parentServer), false);
return ArrayUtils.addAll(daughterOne, daughterTwo);
}
return createAssignProceduresForOpeningNewRegions(env, daughters, regionReplication,
parentServer);
}

static TransitRegionStateProcedure[] createAssignProceduresForOpeningNewRegions(
MasterProcedureEnv env, List<RegionInfo> regions, int regionReplication,
ServerName targetServer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -869,7 +869,7 @@ private TransitRegionStateProcedure[] createAssignProcedures(MasterProcedureEnv
List<RegionInfo> hris = new ArrayList<RegionInfo>(2);
hris.add(daughterOneRI);
hris.add(daughterTwoRI);
return AssignmentManagerUtil.createAssignProceduresForOpeningNewRegions(env, hris,
return AssignmentManagerUtil.createAssignProceduresForSplitDaughters(env, hris,
getRegionReplication(env), getParentRegionServerName(env));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,16 @@
import java.util.Set;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
Expand Down Expand Up @@ -152,4 +156,25 @@ public static boolean waitForAssignment(AssignmentManager am, RegionInfo regionI
proc, 5L * 60 * 1000);
return true;
}

public static void insertData(final HBaseTestingUtility UTIL, final TableName tableName,
int rowCount, int startRowNum, String... cfs) throws IOException {
Table t = UTIL.getConnection().getTable(tableName);
Put p;
for (int i = 0; i < rowCount / 2; i++) {
p = new Put(Bytes.toBytes("" + (startRowNum + i)));
for (String cf : cfs) {
p.addColumn(Bytes.toBytes(cf), Bytes.toBytes("q"), Bytes.toBytes(i));
}
t.put(p);
p = new Put(Bytes.toBytes("" + (startRowNum + rowCount - i - 1)));
for (String cf : cfs) {
p.addColumn(Bytes.toBytes(cf), Bytes.toBytes("q"), Bytes.toBytes(i));
}
t.put(p);
if (i % 5 == 0) {
UTIL.getAdmin().flush(tableName);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,24 @@
*/
package org.apache.hadoop.hbase.master.assignment;

import static org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil.insertData;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureTestingUtility;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
Expand Down Expand Up @@ -60,7 +62,7 @@ public class TestRegionSplit {

protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();

private static String ColumnFamilyName = "cf";
private static String columnFamilyName = "cf";

private static final int startRowNum = 11;
private static final int rowCount = 60;
Expand Down Expand Up @@ -94,7 +96,8 @@ public void setup() throws Exception {
UTIL.getHBaseCluster().getMaster().setCatalogJanitorEnabled(false);
// Disable compaction.
for (int i = 0; i < UTIL.getHBaseCluster().getLiveRegionServerThreads().size(); i++) {
UTIL.getHBaseCluster().getRegionServer(i).getCompactSplitThread().switchCompaction(false);
UTIL.getHBaseCluster().getRegionServer(i).getCompactSplitThread().switchCompaction(
false);
}
}

Expand All @@ -111,8 +114,8 @@ public void testSplitTableRegion() throws Exception {
final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();

RegionInfo[] regions =
MasterProcedureTestingUtility.createTable(procExec, tableName, null, ColumnFamilyName);
insertData(tableName);
MasterProcedureTestingUtility.createTable(procExec, tableName, null, columnFamilyName);
insertData(UTIL, tableName, rowCount, startRowNum, columnFamilyName);
int splitRowNum = startRowNum + rowCount / 2;
byte[] splitKey = Bytes.toBytes("" + splitRowNum);

Expand All @@ -121,7 +124,7 @@ public void testSplitTableRegion() throws Exception {

// Split region of the table
long procId = procExec.submitProcedure(
new SplitTableRegionProcedure(procExec.getEnvironment(), regions[0], splitKey));
new SplitTableRegionProcedure(procExec.getEnvironment(), regions[0], splitKey));
// Wait the completion
ProcedureTestingUtility.waitProcedure(procExec, procId);
ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
Expand All @@ -146,22 +149,12 @@ public void testSplitTableRegion() throws Exception {
UTIL.getAdmin().enableTable(tableName);
Thread.sleep(500);

assertEquals("Table region not correct.", 2,
UTIL.getHBaseCluster().getRegions(tableName).size());
}

private void insertData(final TableName tableName) throws IOException {
Table t = UTIL.getConnection().getTable(tableName);
Put p;
for (int i = 0; i < rowCount / 2; i++) {
p = new Put(Bytes.toBytes("" + (startRowNum + i)));
p.addColumn(Bytes.toBytes(ColumnFamilyName), Bytes.toBytes("q1"), Bytes.toBytes(i));
t.put(p);
p = new Put(Bytes.toBytes("" + (startRowNum + rowCount - i - 1)));
p.addColumn(Bytes.toBytes(ColumnFamilyName), Bytes.toBytes("q1"), Bytes.toBytes(i));
t.put(p);
}
UTIL.getAdmin().flush(tableName);
List<HRegion> tableRegions = UTIL.getHBaseCluster().getRegions(tableName);
assertEquals("Table region not correct.", 2, tableRegions.size());
Map<RegionInfo, ServerName> regionInfoMap = UTIL.getHBaseCluster().getMaster()
.getAssignmentManager().getRegionStates().getRegionAssignments();
assertEquals(regionInfoMap.get(tableRegions.get(0).getRegionInfo()),
regionInfoMap.get(tableRegions.get(1).getRegionInfo()));
}

private ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() {
Expand Down
Loading

0 comments on commit 585aca1

Please sign in to comment.