diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureYieldException.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureYieldException.java index 0487ac5b094a..dbb998132be5 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureYieldException.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureYieldException.java @@ -15,16 +15,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.procedure2; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; -// TODO: Not used yet +/** + * Indicate that a procedure wants to be rescheduled. Usually because there are something wrong but + * we do not want to fail the procedure. + *

+ * TODO: need to support scheduling after a delay. + */ @InterfaceAudience.Private @InterfaceStability.Stable public class ProcedureYieldException extends ProcedureException { + /** default constructor */ public ProcedureYieldException() { super(); diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java index d94cb008683b..e402d0f25843 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java @@ -41,6 +41,8 @@ public final class ReplicationUtils { public static final String REPLICATION_ATTR_NAME = "__rep__"; + public static final String REMOTE_WAL_DIR_NAME = "remoteWALs"; + private ReplicationUtils() { } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java index 864be029008e..7ccbd7180e81 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.mob.MobConstants; import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.FSUtils; @@ -133,7 +134,6 @@ public MasterFileSystem(Configuration conf) throws IOException { * Idempotent. */ private void createInitialFileSystemLayout() throws IOException { - final String[] protectedSubDirs = new String[] { HConstants.BASE_NAMESPACE_DIR, HConstants.HFILE_ARCHIVE_DIRECTORY, @@ -145,7 +145,8 @@ private void createInitialFileSystemLayout() throws IOException { HConstants.HREGION_LOGDIR_NAME, HConstants.HREGION_OLDLOGDIR_NAME, HConstants.CORRUPT_DIR_NAME, - WALProcedureStore.MASTER_PROCEDURE_LOGDIR + WALProcedureStore.MASTER_PROCEDURE_LOGDIR, + ReplicationUtils.REMOTE_WAL_DIR_NAME }; // check if the root directory exists checkRootDir(this.rootdir, conf, this.fs); @@ -192,7 +193,9 @@ public FileSystem getFileSystem() { return this.fs; } - protected FileSystem getWALFileSystem() { return this.walFs; } + public FileSystem getWALFileSystem() { + return this.walFs; + } public Configuration getConfiguration() { return this.conf; @@ -234,13 +237,9 @@ public ClusterId getClusterId() { } /** - * Get the rootdir. Make sure its wholesome and exists before returning. - * @param rd - * @param c - * @param fs - * @return hbase.rootdir (after checks for existence and bootstrapping if - * needed populating the directory with necessary bootup files). - * @throws IOException + * Get the rootdir. Make sure its wholesome and exists before returning. + * @return hbase.rootdir (after checks for existence and bootstrapping if needed populating the + * directory with necessary bootup files). */ private Path checkRootDir(final Path rd, final Configuration c, final FileSystem fs) throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java index 0ec932ce47d3..2c01b166b4eb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; +import org.apache.hadoop.hbase.master.MasterFileSystem; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.assignment.AssignmentManager; import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; @@ -139,6 +140,10 @@ public ReplicationPeerManager getReplicationPeerManager() { return master.getReplicationPeerManager(); } + public MasterFileSystem getMasterFileSystem() { + return master.getMasterFileSystem(); + } + public boolean isRunning() { if (this.master == null || this.master.getMasterProcedureExecutor() == null) return false; return master.getMasterProcedureExecutor().isRunning(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java index 69404a08507d..cc51890c8c2a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java @@ -20,14 +20,18 @@ import java.io.IOException; import java.util.List; import java.util.stream.Collectors; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; +import org.apache.hadoop.hbase.master.MasterFileSystem; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -44,7 +48,7 @@ public class TransitPeerSyncReplicationStateProcedure extends AbstractPeerProcedure { private static final Logger LOG = - LoggerFactory.getLogger(TransitPeerSyncReplicationStateProcedure.class); + LoggerFactory.getLogger(TransitPeerSyncReplicationStateProcedure.class); private SyncReplicationState fromState; @@ -67,8 +71,8 @@ public PeerOperationType getPeerOperationType() { protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { super.serializeStateData(serializer); TransitPeerSyncReplicationStateStateData.Builder builder = - TransitPeerSyncReplicationStateStateData.newBuilder() - .setToState(ReplicationPeerConfigUtil.toSyncReplicationState(toState)); + TransitPeerSyncReplicationStateStateData.newBuilder() + .setToState(ReplicationPeerConfigUtil.toSyncReplicationState(toState)); if (fromState != null) { builder.setFromState(ReplicationPeerConfigUtil.toSyncReplicationState(fromState)); } @@ -79,7 +83,7 @@ protected void serializeStateData(ProcedureStateSerializer serializer) throws IO protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { super.deserializeStateData(serializer); TransitPeerSyncReplicationStateStateData data = - serializer.deserialize(TransitPeerSyncReplicationStateStateData.class); + serializer.deserialize(TransitPeerSyncReplicationStateStateData.class); toState = ReplicationPeerConfigUtil.toSyncReplicationState(data.getToState()); if (data.hasFromState()) { fromState = ReplicationPeerConfigUtil.toSyncReplicationState(data.getFromState()); @@ -205,7 +209,22 @@ protected Flow executeFromState(MasterProcedureEnv env, } return Flow.HAS_MORE_STATE; case CREATE_DIR_FOR_REMOTE_WAL: - // TODO: create wal for write remote wal + MasterFileSystem mfs = env.getMasterFileSystem(); + Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME); + Path remoteWALDirForPeer = new Path(remoteWALDir, peerId); + FileSystem walFs = mfs.getWALFileSystem(); + try { + if (walFs.exists(remoteWALDirForPeer)) { + LOG.warn("Wal dir {} already exists, usually this should not happen, continue anyway", + remoteWALDirForPeer); + } else if (!walFs.mkdirs(remoteWALDirForPeer)) { + LOG.warn("Can not create remote wal dir {}", remoteWALDirForPeer); + throw new ProcedureYieldException(); + } + } catch (IOException e) { + LOG.warn("Failed to create remote wal dir {}", remoteWALDirForPeer, e); + throw new ProcedureYieldException(); + } setNextState( PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION); return Flow.HAS_MORE_STATE; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplication.java index acddc4a5aa47..196019d7a673 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplication.java @@ -19,7 +19,9 @@ import static org.hamcrest.CoreMatchers.containsString; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.ArrayList; @@ -45,6 +47,7 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.master.MasterFileSystem; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; @@ -154,8 +157,13 @@ private void assertDisallow(Table table, TableAction action) throws IOException @Test public void testStandby() throws Exception { + MasterFileSystem mfs = UTIL2.getHBaseCluster().getMaster().getMasterFileSystem(); + Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME); + Path remoteWALDirForPeer = new Path(remoteWALDir, PEER_ID); + assertFalse(mfs.getWALFileSystem().exists(remoteWALDirForPeer)); UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, SyncReplicationState.STANDBY); + assertTrue(mfs.getWALFileSystem().exists(remoteWALDirForPeer)); try (Table table = UTIL2.getConnection().getTable(TABLE_NAME)) { assertDisallow(table, t -> t.get(new Get(Bytes.toBytes("row")))); assertDisallow(table,