Skip to content

Commit

Permalink
HBASE-20586: add support for clusters on different realms (with cross…
Browse files Browse the repository at this point in the history
…-realm authentication)

Signed-off-by: Andrew Purtell <[email protected]>
  • Loading branch information
wellington authored and apurtell committed Apr 15, 2019
1 parent 88de644 commit cd61bcc
Showing 1 changed file with 19 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
Expand Down Expand Up @@ -83,12 +84,28 @@ public SyncTable(Configuration conf) {
super(conf);
}

private void initCredentialsForHBase(String zookeeper, Job job) throws IOException {
Configuration peerConf = HBaseConfiguration.createClusterConf(job
.getConfiguration(), zookeeper);
if(peerConf.get("hbase.security.authentication").equals("kerberos")){
TableMapReduceUtil.initCredentialsForCluster(job, peerConf);
}
}

public Job createSubmittableJob(String[] args) throws IOException {
FileSystem fs = sourceHashDir.getFileSystem(getConf());
if (!fs.exists(sourceHashDir)) {
throw new IOException("Source hash dir not found: " + sourceHashDir);
}

Job job = Job.getInstance(getConf(),getConf().get("mapreduce.job.name",
"syncTable_" + sourceTableName + "-" + targetTableName));
Configuration jobConf = job.getConfiguration();
if (jobConf.get("hadoop.security.authentication").equals("kerberos")) {
TokenCache.obtainTokensForNamenodes(job.getCredentials(), new
Path[] { sourceHashDir }, getConf());
}

HashTable.TableHash tableHash = HashTable.TableHash.read(getConf(), sourceHashDir);
LOG.info("Read source hash manifest: " + tableHash);
LOG.info("Read " + tableHash.partitions.size() + " partition keys");
Expand Down Expand Up @@ -118,18 +135,17 @@ public Job createSubmittableJob(String[] args) throws IOException {
+ " found in the partitions file is " + tableHash.partitions.size());
}

Job job = Job.getInstance(getConf(),getConf().get("mapreduce.job.name",
"syncTable_" + sourceTableName + "-" + targetTableName));
Configuration jobConf = job.getConfiguration();
job.setJarByClass(HashTable.class);
jobConf.set(SOURCE_HASH_DIR_CONF_KEY, sourceHashDir.toString());
jobConf.set(SOURCE_TABLE_CONF_KEY, sourceTableName);
jobConf.set(TARGET_TABLE_CONF_KEY, targetTableName);
if (sourceZkCluster != null) {
jobConf.set(SOURCE_ZK_CLUSTER_CONF_KEY, sourceZkCluster);
initCredentialsForHBase(sourceZkCluster, job);
}
if (targetZkCluster != null) {
jobConf.set(TARGET_ZK_CLUSTER_CONF_KEY, targetZkCluster);
initCredentialsForHBase(targetZkCluster, job);
}
jobConf.setBoolean(DRY_RUN_CONF_KEY, dryRun);
jobConf.setBoolean(DO_DELETES_CONF_KEY, doDeletes);
Expand Down

0 comments on commit cd61bcc

Please sign in to comment.