Skip to content

Commit

Permalink
Fixing MR intermediate spills. Contributed by Arun Suresh.
Browse files Browse the repository at this point in the history
  • Loading branch information
vinoduec committed May 14, 2015
1 parent 53fe4ef commit 6b710a4
Show file tree
Hide file tree
Showing 13 changed files with 156 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public class LocalContainerLauncher extends AbstractService implements
private final ClassLoader jobClassLoader;
private ExecutorService taskRunner;
private Thread eventHandler;
private byte[] encryptedSpillKey = new byte[] {0};
private BlockingQueue<ContainerLauncherEvent> eventQueue =
new LinkedBlockingQueue<ContainerLauncherEvent>();

Expand Down Expand Up @@ -176,6 +177,11 @@ public void handle(ContainerLauncherEvent event) {
}
}

public void setEncryptedSpillKey(byte[] encryptedSpillKey) {
if (encryptedSpillKey != null) {
this.encryptedSpillKey = encryptedSpillKey;
}
}

/*
* Uber-AM lifecycle/ordering ("normal" case):
Expand Down Expand Up @@ -382,6 +388,10 @@ private void runSubtask(org.apache.hadoop.mapred.Task task,
// map to handle)
conf.setBoolean("mapreduce.task.uberized", true);

// Check and handle Encrypted spill key
task.setEncryptedSpillKey(encryptedSpillKey);
YarnChild.setEncryptedSpillKeyIfRequired(task);

// META-FIXME: do we want the extra sanity-checking (doneWithMaps,
// etc.), or just assume/hope the state machine(s) and uber-AM work
// as expected?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,20 +84,30 @@ public class TaskAttemptListenerImpl extends CompositeService
jvmIDToActiveAttemptMap
= new ConcurrentHashMap<WrappedJvmID, org.apache.hadoop.mapred.Task>();
private Set<WrappedJvmID> launchedJVMs = Collections
.newSetFromMap(new ConcurrentHashMap<WrappedJvmID, Boolean>());
.newSetFromMap(new ConcurrentHashMap<WrappedJvmID, Boolean>());

private JobTokenSecretManager jobTokenSecretManager = null;
private AMPreemptionPolicy preemptionPolicy;

private byte[] encryptedSpillKey;

public TaskAttemptListenerImpl(AppContext context,
JobTokenSecretManager jobTokenSecretManager,
RMHeartbeatHandler rmHeartbeatHandler,
AMPreemptionPolicy preemptionPolicy) {
this(context, jobTokenSecretManager, rmHeartbeatHandler,
preemptionPolicy, null);
}

public TaskAttemptListenerImpl(AppContext context,
JobTokenSecretManager jobTokenSecretManager,
RMHeartbeatHandler rmHeartbeatHandler,
AMPreemptionPolicy preemptionPolicy, byte[] secretShuffleKey) {
super(TaskAttemptListenerImpl.class.getName());
this.context = context;
this.jobTokenSecretManager = jobTokenSecretManager;
this.rmHeartbeatHandler = rmHeartbeatHandler;
this.preemptionPolicy = preemptionPolicy;
this.encryptedSpillKey = secretShuffleKey;
}

@Override
Expand Down Expand Up @@ -484,6 +494,7 @@ public JvmTask getTask(JvmContext context) throws IOException {
jvmIDToActiveAttemptMap.remove(wJvmID);
launchedJVMs.remove(wJvmID);
LOG.info("JVM with ID: " + jvmId + " given task: " + task.getTaskID());
task.setEncryptedSpillKey(encryptedSpillKey);
jvmTask = new JvmTask(task, false);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ public TaskUmbilicalProtocol run() throws Exception {
@Override
public Object run() throws Exception {
// use job-specified working directory
setEncryptedSpillKeyIfRequired(taskFinal);
FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
taskFinal.run(job, umbilical); // run the task
return null;
Expand Down Expand Up @@ -223,6 +224,23 @@ public Object run() throws Exception {
}
}

/**
* Utility method to check if the Encrypted Spill Key needs to be set into the
* user credentials of the user running the Map / Reduce Task
* @param task The Map / Reduce task to set the Encrypted Spill information in
* @throws Exception
*/
public static void setEncryptedSpillKeyIfRequired(Task task) throws
Exception {
if ((task != null) && (task.getEncryptedSpillKey() != null) && (task
.getEncryptedSpillKey().length > 1)) {
Credentials creds =
UserGroupInformation.getCurrentUser().getCredentials();
TokenCache.setEncryptedSpillKey(task.getEncryptedSpillKey(), creds);
UserGroupInformation.getCurrentUser().addCredentials(creds);
}
}

/**
* Configure mapred-local dirs. This config is used by the task for finding
* out an output directory.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.security.NoSuchAlgorithmException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -47,6 +48,7 @@
import org.apache.hadoop.mapred.TaskAttemptListenerImpl;
import org.apache.hadoop.mapred.TaskLog;
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.mapreduce.CryptoUtils;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
Expand Down Expand Up @@ -148,6 +150,8 @@

import com.google.common.annotations.VisibleForTesting;

import javax.crypto.KeyGenerator;

/**
* The Map-Reduce Application Master.
* The state machine is encapsulated in the implementation of Job interface.
Expand Down Expand Up @@ -175,6 +179,7 @@ public class MRAppMaster extends CompositeService {
* Priority of the MRAppMaster shutdown hook.
*/
public static final int SHUTDOWN_HOOK_PRIORITY = 30;
public static final String INTERMEDIATE_DATA_ENCRYPTION_ALGO = "HmacSHA1";

private Clock clock;
private final long startTime;
Expand Down Expand Up @@ -206,6 +211,7 @@ public class MRAppMaster extends CompositeService {
private JobHistoryEventHandler jobHistoryEventHandler;
private SpeculatorEventDispatcher speculatorEventDispatcher;
private AMPreemptionPolicy preemptionPolicy;
private byte[] encryptedSpillKey;

// After a task attempt completes from TaskUmbilicalProtocol's point of view,
// it will be transitioned to finishing state.
Expand Down Expand Up @@ -704,8 +710,22 @@ protected void initJobCredentialsAndUGI(Configuration conf) {
try {
this.currentUser = UserGroupInformation.getCurrentUser();
this.jobCredentials = ((JobConf)conf).getCredentials();
if (CryptoUtils.isEncryptedSpillEnabled(conf)) {
int keyLen = conf.getInt(
MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS,
MRJobConfig
.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS);
KeyGenerator keyGen =
KeyGenerator.getInstance(INTERMEDIATE_DATA_ENCRYPTION_ALGO);
keyGen.init(keyLen);
encryptedSpillKey = keyGen.generateKey().getEncoded();
} else {
encryptedSpillKey = new byte[] {0};
}
} catch (IOException e) {
throw new YarnRuntimeException(e);
} catch (NoSuchAlgorithmException e) {
throw new YarnRuntimeException(e);
}
}

Expand Down Expand Up @@ -762,7 +782,7 @@ protected TaskAttemptListener createTaskAttemptListener(AppContext context,
AMPreemptionPolicy preemptionPolicy) {
TaskAttemptListener lis =
new TaskAttemptListenerImpl(context, jobTokenSecretManager,
getRMHeartbeatHandler(), preemptionPolicy);
getRMHeartbeatHandler(), preemptionPolicy, encryptedSpillKey);
return lis;
}

Expand Down Expand Up @@ -929,6 +949,8 @@ protected void serviceStart() throws Exception {
if (job.isUber()) {
this.containerLauncher = new LocalContainerLauncher(context,
(TaskUmbilicalProtocol) taskAttemptListener, jobClassLoader);
((LocalContainerLauncher) this.containerLauncher)
.setEncryptedSpillKey(encryptedSpillKey);
} else {
this.containerLauncher = new ContainerLauncherImpl(context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ static synchronized String getOutputName(int partition) {
private String user; // user running the job
private TaskAttemptID taskId; // unique, includes job id
private int partition; // id within job
private byte[] encryptedSpillKey = new byte[] {0}; // Key Used to encrypt
// intermediate spills
TaskStatus taskStatus; // current status of the task
protected JobStatus.State jobRunStateForCleanup;
protected boolean jobCleanup = false;
Expand Down Expand Up @@ -262,6 +264,24 @@ public void setJobTokenSecret(SecretKey tokenSecret) {
this.tokenSecret = tokenSecret;
}

/**
* Get Encrypted spill key
* @return encrypted spill key
*/
public byte[] getEncryptedSpillKey() {
return encryptedSpillKey;
}

/**
* Set Encrypted spill key
* @param encryptedSpillKey key
*/
public void setEncryptedSpillKey(byte[] encryptedSpillKey) {
if (encryptedSpillKey != null) {
this.encryptedSpillKey = encryptedSpillKey;
}
}

/**
* Get the job token secret
* @return the token secret
Expand Down Expand Up @@ -492,6 +512,8 @@ public void write(DataOutput out) throws IOException {
out.writeBoolean(writeSkipRecs);
out.writeBoolean(taskCleanup);
Text.writeString(out, user);
out.writeInt(encryptedSpillKey.length);
out.write(encryptedSpillKey);
extraData.write(out);
}

Expand All @@ -517,6 +539,9 @@ public void readFields(DataInput in) throws IOException {
setPhase(TaskStatus.Phase.CLEANUP);
}
user = StringInterner.weakIntern(Text.readString(in));
int len = in.readInt();
encryptedSpillKey = new byte[len];
in.readFully(encryptedSpillKey);
extraData.readFields(in);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.hadoop.fs.crypto.CryptoFSDataInputStream;
import org.apache.hadoop.fs.crypto.CryptoFSDataOutputStream;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.LimitInputStream;
Expand All @@ -50,7 +49,7 @@ public class CryptoUtils {

private static final Log LOG = LogFactory.getLog(CryptoUtils.class);

public static boolean isShuffleEncrypted(Configuration conf) {
public static boolean isEncryptedSpillEnabled(Configuration conf) {
return conf.getBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA,
MRJobConfig.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA);
}
Expand All @@ -64,7 +63,7 @@ public static boolean isShuffleEncrypted(Configuration conf) {
*/
public static byte[] createIV(Configuration conf) throws IOException {
CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf);
if (isShuffleEncrypted(conf)) {
if (isEncryptedSpillEnabled(conf)) {
byte[] iv = new byte[cryptoCodec.getCipherSuite().getAlgorithmBlockSize()];
cryptoCodec.generateSecureRandom(iv);
return iv;
Expand All @@ -75,13 +74,13 @@ public static byte[] createIV(Configuration conf) throws IOException {

public static int cryptoPadding(Configuration conf) {
// Sizeof(IV) + long(start-offset)
return isShuffleEncrypted(conf) ? CryptoCodec.getInstance(conf)
return isEncryptedSpillEnabled(conf) ? CryptoCodec.getInstance(conf)
.getCipherSuite().getAlgorithmBlockSize() + 8 : 0;
}

private static byte[] getEncryptionKey() throws IOException {
return TokenCache.getShuffleSecretKey(UserGroupInformation.getCurrentUser()
.getCredentials());
return TokenCache.getEncryptedSpillKey(UserGroupInformation.getCurrentUser()
.getCredentials());
}

private static int getBufferSize(Configuration conf) {
Expand All @@ -102,7 +101,7 @@ private static int getBufferSize(Configuration conf) {
*/
public static FSDataOutputStream wrapIfNecessary(Configuration conf,
FSDataOutputStream out) throws IOException {
if (isShuffleEncrypted(conf)) {
if (isEncryptedSpillEnabled(conf)) {
out.write(ByteBuffer.allocate(8).putLong(out.getPos()).array());
byte[] iv = createIV(conf);
out.write(iv);
Expand Down Expand Up @@ -137,7 +136,7 @@ public static FSDataOutputStream wrapIfNecessary(Configuration conf,
*/
public static InputStream wrapIfNecessary(Configuration conf, InputStream in,
long length) throws IOException {
if (isShuffleEncrypted(conf)) {
if (isEncryptedSpillEnabled(conf)) {
int bufferSize = getBufferSize(conf);
if (length > -1) {
in = new LimitInputStream(in, length);
Expand Down Expand Up @@ -174,7 +173,7 @@ public static InputStream wrapIfNecessary(Configuration conf, InputStream in,
*/
public static FSDataInputStream wrapIfNecessary(Configuration conf,
FSDataInputStream in) throws IOException {
if (isShuffleEncrypted(conf)) {
if (isEncryptedSpillEnabled(conf)) {
CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf);
int bufferSize = getBufferSize(conf);
// Not going to be used... but still has to be read...
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@
package org.apache.hadoop.mapreduce;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -42,7 +40,6 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.Text;
Expand All @@ -52,7 +49,6 @@
import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;

import org.apache.hadoop.mapreduce.counters.Limits;
import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
import org.apache.hadoop.mapreduce.security.TokenCache;
Expand Down Expand Up @@ -176,20 +172,20 @@ JobStatus submitJobInternal(Job job, Cluster cluster)
if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
KeyGenerator keyGen;
try {

int keyLen = CryptoUtils.isShuffleEncrypted(conf)
? conf.getInt(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS,
MRJobConfig.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS)
: SHUFFLE_KEY_LENGTH;
keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
keyGen.init(keyLen);
keyGen.init(SHUFFLE_KEY_LENGTH);
} catch (NoSuchAlgorithmException e) {
throw new IOException("Error generating shuffle secret key", e);
}
SecretKey shuffleKey = keyGen.generateKey();
TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
job.getCredentials());
}
if (CryptoUtils.isEncryptedSpillEnabled(conf)) {
conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1);
LOG.warn("Max job attempts set to 1 since encrypted intermediate" +
"data spill is enabled");
}

copyAndConfigureFiles(job, submitJobDir);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ private static void mergeBinaryTokens(Credentials creds, Configuration conf) {
public static final String JOB_TOKENS_FILENAME = "mapreduce.job.jobTokenFile";
private static final Text JOB_TOKEN = new Text("JobToken");
private static final Text SHUFFLE_TOKEN = new Text("MapReduceShuffleToken");
private static final Text ENC_SPILL_KEY = new Text("MapReduceEncryptedSpillKey");

/**
* load job token from a file
Expand Down Expand Up @@ -244,6 +245,15 @@ public static byte[] getShuffleSecretKey(Credentials credentials) {
return getSecretKey(credentials, SHUFFLE_TOKEN);
}

@InterfaceAudience.Private
public static void setEncryptedSpillKey(byte[] key, Credentials credentials) {
credentials.addSecretKey(ENC_SPILL_KEY, key);
}

@InterfaceAudience.Private
public static byte[] getEncryptedSpillKey(Credentials credentials) {
return getSecretKey(credentials, ENC_SPILL_KEY);
}
/**
* @deprecated Use {@link Credentials#getToken(org.apache.hadoop.io.Text)}
* instead, this method is included for compatibility against Hadoop-1
Expand Down
Loading

0 comments on commit 6b710a4

Please sign in to comment.