Skip to content

Commit

Permalink
HBASE-22404 Open/Close region request may be executed twice when mast…
Browse files Browse the repository at this point in the history
…er restart
  • Loading branch information
infraio authored May 16, 2019
1 parent 7878389 commit bdd2fc6
Show file tree
Hide file tree
Showing 16 changed files with 159 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
Expand Down Expand Up @@ -232,8 +233,12 @@ public RemoteProcedure getRemoteProcedure() {
public interface RemoteProcedure<TEnv, TRemote> {
/**
* For building the remote operation.
* May be empty if no need to send remote call. Usually, this means the RemoteProcedure has been
* finished already. This is possible, as we may have already sent the procedure to RS but then
* the rpc connection is broken so the executeProcedures call fails, but the RS does receive the
* procedure and execute it and then report back, before we retry again.
*/
RemoteOperation remoteCallBuild(TEnv env, TRemote remote);
Optional<RemoteOperation> remoteCallBuild(TEnv env, TRemote remote);

/**
* Called when the executeProcedure call is failed.
Expand Down Expand Up @@ -277,8 +282,8 @@ protected ArrayListMultimap<Class<?>, RemoteOperation> buildAndGroupRequestByTyp
final TRemote remote, final Set<RemoteProcedure> remoteProcedures) {
final ArrayListMultimap<Class<?>, RemoteOperation> requestByType = ArrayListMultimap.create();
for (RemoteProcedure proc : remoteProcedures) {
RemoteOperation operation = proc.remoteCallBuild(env, remote);
requestByType.put(operation.getClass(), operation);
Optional<RemoteOperation> operation = proc.remoteCallBuild(env, remote);
operation.ifPresent(op -> requestByType.put(op.getClass(), op));
}
return requestByType;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.hadoop.hbase.master.assignment;

import java.io.IOException;
import java.util.Optional;

import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
Expand Down Expand Up @@ -117,9 +119,9 @@ protected void reportTransition(final MasterProcedureEnv env, final RegionStateN
}

@Override
public RemoteOperation remoteCallBuild(final MasterProcedureEnv env,
public Optional<RemoteOperation> remoteCallBuild(final MasterProcedureEnv env,
final ServerName serverName) {
return null;
return Optional.empty();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public TableOperationType getTableOperationType() {
}

@Override
public RemoteOperation remoteCallBuild(MasterProcedureEnv env, ServerName remote) {
public RemoteOperation newRemoteOperation() {
return new RegionCloseOperation(this, region, getProcId(), assignCandidate);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public TableOperationType getTableOperationType() {
}

@Override
public RemoteOperation remoteCallBuild(MasterProcedureEnv env, ServerName remote) {
public RemoteOperation newRemoteOperation() {
return new RegionOpenOperation(this, region, getProcId());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.hadoop.hbase.master.assignment;

import java.io.IOException;
import java.util.Optional;

import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
Expand All @@ -32,6 +34,7 @@
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
import org.apache.hadoop.hbase.util.RetryCounter;
Expand Down Expand Up @@ -81,6 +84,19 @@ protected RegionRemoteProcedureBase(TransitRegionStateProcedure parent, RegionIn
parent.attachRemoteProc(this);
}

@Override
public Optional<RemoteProcedureDispatcher.RemoteOperation> remoteCallBuild(MasterProcedureEnv env,
ServerName remote) {
// REPORT_SUCCEED means that this remote open/close request already executed in RegionServer.
// So return empty operation and RSProcedureDispatcher no need to send it again.
if (state == RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_REPORT_SUCCEED) {
return Optional.empty();
}
return Optional.of(newRemoteOperation());
}

protected abstract RemoteProcedureDispatcher.RemoteOperation newRemoteOperation();

@Override
public void remoteOperationCompleted(MasterProcedureEnv env) {
// should not be called since we use reportRegionStateTransition to report the result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.master.assignment;

import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
Expand Down Expand Up @@ -124,7 +125,8 @@ protected abstract void reportTransition(MasterProcedureEnv env, RegionStateNode
TransitionCode code, long seqId) throws UnexpectedStateException;

@Override
public abstract RemoteOperation remoteCallBuild(MasterProcedureEnv env, ServerName serverName);
public abstract Optional<RemoteOperation> remoteCallBuild(MasterProcedureEnv env,
ServerName serverName);

protected abstract boolean remoteCallFailed(MasterProcedureEnv env, RegionStateNode regionNode,
IOException exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package org.apache.hadoop.hbase.master.assignment;

import java.io.IOException;
import java.util.Optional;

import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
Expand Down Expand Up @@ -126,9 +128,9 @@ protected void finishTransition(final MasterProcedureEnv env, final RegionStateN
}

@Override
public RemoteOperation remoteCallBuild(final MasterProcedureEnv env,
public Optional<RemoteOperation> remoteCallBuild(final MasterProcedureEnv env,
final ServerName serverName) {
return null;
return Optional.empty();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.master.procedure;

import java.io.IOException;
import java.util.Optional;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.DoNotRetryIOException;
Expand Down Expand Up @@ -84,11 +85,12 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws
}

@Override
public RemoteProcedureDispatcher.RemoteOperation remoteCallBuild(MasterProcedureEnv env,
public Optional<RemoteProcedureDispatcher.RemoteOperation> remoteCallBuild(MasterProcedureEnv env,
ServerName serverName) {
return new RSProcedureDispatcher.ServerOperation(this, getProcId(), SplitWALCallable.class,
MasterProcedureProtos.SplitWALParameter.newBuilder().setWalPath(walPath).build()
.toByteArray());
return Optional
.of(new RSProcedureDispatcher.ServerOperation(this, getProcId(), SplitWALCallable.class,
MasterProcedureProtos.SplitWALParameter.newBuilder().setWalPath(walPath).build()
.toByteArray()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.master.procedure;

import java.io.IOException;
import java.util.Optional;

import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
Expand Down Expand Up @@ -74,15 +75,13 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws
}

@Override
public RemoteProcedureDispatcher.RemoteOperation
remoteCallBuild(MasterProcedureEnv masterProcedureEnv, ServerName remote) {
public Optional<RemoteProcedureDispatcher.RemoteOperation> remoteCallBuild(
MasterProcedureEnv masterProcedureEnv, ServerName remote) {
assert targetServer.equals(remote);
return new RSProcedureDispatcher.ServerOperation(this, getProcId(),
SwitchRpcThrottleRemoteCallable.class,
SwitchRpcThrottleRemoteStateData.newBuilder()
.setTargetServer(ProtobufUtil.toServerName(remote))
.setRpcThrottleEnabled(rpcThrottleEnabled).build()
.toByteArray());
return Optional.of(new RSProcedureDispatcher.ServerOperation(this, getProcId(),
SwitchRpcThrottleRemoteCallable.class, SwitchRpcThrottleRemoteStateData.newBuilder()
.setTargetServer(ProtobufUtil.toServerName(remote))
.setRpcThrottleEnabled(rpcThrottleEnabled).build().toByteArray()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.hadoop.hbase.master.replication;

import java.io.IOException;
import java.util.Optional;

import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
Expand Down Expand Up @@ -112,12 +114,12 @@ private static PeerOperationType toPeerOperationType(PeerModificationType type)
}

@Override
public RemoteOperation remoteCallBuild(MasterProcedureEnv env, ServerName remote) {
public Optional<RemoteOperation> remoteCallBuild(MasterProcedureEnv env, ServerName remote) {
assert targetServer.equals(remote);
return new ServerOperation(this, getProcId(), RefreshPeerCallable.class,
return Optional.of(new ServerOperation(this, getProcId(), RefreshPeerCallable.class,
RefreshPeerParameter.newBuilder().setPeerId(peerId).setType(toPeerModificationType(type))
.setTargetServer(ProtobufUtil.toServerName(remote)).setStage(stage).build()
.toByteArray());
.toByteArray()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
Expand Down Expand Up @@ -62,13 +63,14 @@ public SyncReplicationReplayWALRemoteProcedure(String peerId, List<String> wals,
}

@Override
public RemoteOperation remoteCallBuild(MasterProcedureEnv env, ServerName remote) {
public Optional<RemoteOperation> remoteCallBuild(MasterProcedureEnv env, ServerName remote) {
ReplaySyncReplicationWALParameter.Builder builder =
ReplaySyncReplicationWALParameter.newBuilder();
ReplaySyncReplicationWALParameter.newBuilder();
builder.setPeerId(peerId);
wals.stream().forEach(builder::addWal);
return new ServerOperation(this, getProcId(), ReplaySyncReplicationWALCallable.class,
builder.build().toByteArray());
return Optional
.of(new ServerOperation(this, getProcId(), ReplaySyncReplicationWALCallable.class,
builder.build().toByteArray()));
}

protected void complete(MasterProcedureEnv env, Throwable error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
Expand Down Expand Up @@ -191,6 +192,8 @@
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
Expand Down Expand Up @@ -255,6 +258,18 @@ public class HRegionServer extends HasThread implements
protected final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS =
new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);

/**
* Used to cache the open/close region procedures which already submitted.
* See {@link #submitRegionProcedure(long)}.
*/
private final ConcurrentMap<Long, Long> submittedRegionProcedures = new ConcurrentHashMap<>();
/**
* Used to cache the open/close region procedures which already executed.
* See {@link #submitRegionProcedure(long)}.
*/
private final Cache<Long, Long> executedRegionProcedures =
CacheBuilder.newBuilder().expireAfterAccess(600, TimeUnit.SECONDS).build();

// Cache flushing
protected MemStoreFlusher cacheFlusher;

Expand Down Expand Up @@ -3882,6 +3897,51 @@ void reportProcedureDone(ReportProcedureDoneRequest request) throws IOException
}
}

/**
* Will ignore the open/close region procedures which already submitted or executed.
*
* When master had unfinished open/close region procedure and restarted, new active master may
* send duplicate open/close region request to regionserver. The open/close request is submitted
* to a thread pool and execute. So first need a cache for submitted open/close region procedures.
*
* After the open/close region request executed and report region transition succeed, cache it in
* executed region procedures cache. See {@link #finishRegionProcedure(long)}. After report region
* transition succeed, master will not send the open/close region request to regionserver again.
* And we thought that the ongoing duplicate open/close region request should not be delayed more
* than 600 seconds. So the executed region procedures cache will expire after 600 seconds.
*
* See HBASE-22404 for more details.
*
* @param procId the id of the open/close region procedure
* @return true if the procedure can be submitted.
*/
boolean submitRegionProcedure(long procId) {
if (procId == -1) {
return true;
}
// Ignore the region procedures which already submitted.
Long previous = submittedRegionProcedures.putIfAbsent(procId, procId);
if (previous != null) {
LOG.warn("Received procedure pid={}, which already submitted, just ignore it", procId);
return false;
}
// Ignore the region procedures which already executed.
if (executedRegionProcedures.getIfPresent(procId) != null) {
LOG.warn("Received procedure pid={}, which already executed, just ignore it", procId);
return false;
}
return true;
}

/**
* See {@link #submitRegionProcedure(long)}.
* @param procId the id of the open/close region procedure
*/
public void finishRegionProcedure(long procId) {
executedRegionProcedures.put(procId, procId);
submittedRegionProcedures.remove(procId);
}

public boolean isShutDown() {
return shutDown;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3721,8 +3721,12 @@ private void executeOpenRegionProcedures(OpenRegionRequest request,
regionServer.updateRegionFavoredNodesMapping(regionInfo.getEncodedName(),
regionOpenInfo.getFavoredNodesList());
}
regionServer.executorService.submit(AssignRegionHandler.create(regionServer, regionInfo,
regionOpenInfo.getOpenProcId(), tableDesc, masterSystemTime));
long procId = regionOpenInfo.getOpenProcId();
if (regionServer.submitRegionProcedure(procId)) {
regionServer.executorService.submit(AssignRegionHandler
.create(regionServer, regionInfo, procId, tableDesc,
masterSystemTime));
}
}
}

Expand All @@ -3733,11 +3737,14 @@ private void executeCloseRegionProcedures(CloseRegionRequest request) {
} catch (DoNotRetryIOException e) {
throw new UncheckedIOException("Should not happen", e);
}
ServerName destination =
request.hasDestinationServer() ? ProtobufUtil.toServerName(request.getDestinationServer())
: null;
regionServer.executorService.submit(UnassignRegionHandler.create(regionServer, encodedName,
request.getCloseProcId(), false, destination));
ServerName destination = request.hasDestinationServer() ?
ProtobufUtil.toServerName(request.getDestinationServer()) :
null;
long procId = request.getCloseProcId();
if (regionServer.submitRegionProcedure(procId)) {
regionServer.executorService.submit(UnassignRegionHandler
.create(regionServer, encodedName, procId, false, destination));
}
}

private void executeProcedures(RemoteProcedureRequest request) {
Expand Down
Loading

0 comments on commit bdd2fc6

Please sign in to comment.