Skip to content

Commit

Permalink
HIVE-9281 : Code cleanup [Spark Branch] (Szehon, reviewed by Xuefu)
Browse files Browse the repository at this point in the history
git-svn-id: https://svn.apache.org/repos/asf/hive/branches/spark@1650201 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
Szehon Ho committed Jan 8, 2015
1 parent 10e7dbb commit 153e9ec
Show file tree
Hide file tree
Showing 83 changed files with 603 additions and 719 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public static void afterTest() throws Exception {
}

/**
* Verify that the connection to HS2 with MiniMr is successful
* Verify that the connection to HS2 with MiniMr is successful.
* @throws Exception
*/
@Test
Expand All @@ -134,7 +134,7 @@ public void testConnection() throws Exception {
}

/**
* Run nonMr query
* Run nonMr query.
* @throws Exception
*/
@Test
Expand All @@ -147,15 +147,15 @@ public void testNonSparkQuery() throws Exception {
}

/**
* Run nonMr query
* Run nonMr query.
* @throws Exception
*/
@Test
public void testSparkQuery() throws Exception {
String tableName = "testTab2";
String resultVal = "val_238";
String queryStr = "SELECT * FROM " + tableName +
" where value = '" + resultVal + "'";
String queryStr = "SELECT * FROM " + tableName
+ " where value = '" + resultVal + "'";

testKvQuery(tableName, queryStr, resultVal);
}
Expand Down Expand Up @@ -233,8 +233,8 @@ public void testTempTable() throws Exception {
+ dataFilePath.toString() + "' into table " + tempTableName);

String resultVal = "val_238";
String queryStr = "SELECT * FROM " + tempTableName +
" where value = '" + resultVal + "'";
String queryStr = "SELECT * FROM " + tempTableName
+ " where value = '" + resultVal + "'";
verifyResult(queryStr, resultVal, 2);

// A second connection should not be able to see the table
Expand All @@ -244,8 +244,7 @@ public void testTempTable() throws Exception {
stmt2.execute("USE " + dbName);
boolean gotException = false;
try {
ResultSet res;
res = stmt2.executeQuery(queryStr);
stmt2.executeQuery(queryStr);
} catch (SQLException err) {
// This is expected to fail.
assertTrue("Expecting table not found error, instead got: " + err,
Expand All @@ -266,7 +265,7 @@ private void checkForNotExist(ResultSet res) throws Exception {
}

/**
* Verify if the given property contains the expected value
* Verify if the given property contains the expected value.
* @param propertyName
* @param expectedValue
* @throws Exception
Expand All @@ -275,7 +274,7 @@ private void verifyProperty(String propertyName, String expectedValue) throws Ex
Statement stmt = hs2Conn .createStatement();
ResultSet res = stmt.executeQuery("set " + propertyName);
assertTrue(res.next());
String results[] = res.getString(1).split("=");
String[] results = res.getString(1).split("=");
assertEquals("Property should be set", results.length, 2);
assertEquals("Property should be set", expectedValue, results[1]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

public class SparkHashTableSinkOperator
extends TerminalOperator<SparkHashTableSinkDesc> implements Serializable {
private static final int MIN_REPLICATION = 10;
private static final long serialVersionUID = 1L;
private final String CLASS_NAME = this.getClass().getName();
private final PerfLogger perfLogger = PerfLogger.getPerfLogger();
Expand Down Expand Up @@ -122,7 +123,6 @@ protected void flushToFile(MapJoinPersistableTableContainer tableContainer,
+ "-" + Math.abs(Utilities.randGen.nextInt()));
try {
// This will guarantee file name uniqueness.
// TODO: can we use the task id, which should be unique
if (fs.createNewFile(path)) {
break;
}
Expand All @@ -131,10 +131,10 @@ protected void flushToFile(MapJoinPersistableTableContainer tableContainer,
}
// TODO find out numOfPartitions for the big table
int numOfPartitions = replication;
replication = (short)Math.min(10, numOfPartitions);
replication = (short) Math.min(MIN_REPLICATION, numOfPartitions);
}
htsOperator.console.printInfo(Utilities.now() + "\tDump the side-table for tag: " + tag +
" with group count: " + tableContainer.size() + " into file: " + path);
htsOperator.console.printInfo(Utilities.now() + "\tDump the side-table for tag: " + tag
+ " with group count: " + tableContainer.size() + " into file: " + path);
// get the hashtable file and path
// get the hashtable file and path
OutputStream os = null;
Expand All @@ -153,8 +153,8 @@ protected void flushToFile(MapJoinPersistableTableContainer tableContainer,
}
tableContainer.clear();
FileStatus status = fs.getFileStatus(path);
htsOperator.console.printInfo(Utilities.now() + "\tUploaded 1 File to: " + path +
" (" + status.getLen() + " bytes)");
htsOperator.console.printInfo(Utilities.now() + "\tUploaded 1 File to: " + path
+ " (" + status.getLen() + " bytes)");
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_FLUSH_HASHTABLE + this.getName());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void load(
bigInputPath = null;
} else {
Set<String> aliases =
((SparkBucketMapJoinContext)mapJoinCtx).getPosToAliasMap().get(pos);
((SparkBucketMapJoinContext) mapJoinCtx).getPosToAliasMap().get(pos);
String alias = aliases.iterator().next();
// Any one small table input path
String smallInputPath =
Expand All @@ -110,7 +110,7 @@ public void load(
}
}
String fileName = localWork.getBucketFileName(bigInputPath);
Path path = Utilities.generatePath(baseDir, desc.getDumpFilePrefix(), (byte)pos, fileName);
Path path = Utilities.generatePath(baseDir, desc.getDumpFilePrefix(), (byte) pos, fileName);
LOG.info("\tLoad back all hashtable files from tmp folder uri:" + path);
mapJoinTables[pos] = mapJoinTableSerdes[pos].load(fs, path);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,15 @@ public void collect(HiveKey key, BytesWritable value) throws IOException {
/** Process the given record. */
protected abstract void processNextRecord(T inputRecord) throws IOException;

/** Is the current state of the record processor done? */
/**
* @return true if current state of the record processor is done.
*/
protected abstract boolean processingDone();

/** Close the record processor */
/** Close the record processor. */
protected abstract void closeRecordProcessor();

/** Implement Iterator interface */
/** Implement Iterator interface. */
public class ResultIterator implements Iterator {
@Override
public boolean hasNext(){
Expand All @@ -98,8 +100,7 @@ public boolean hasNext(){
return true;
}
} catch (IOException ex) {
// TODO: better handling of exception.
throw new RuntimeException("Error while processing input.", ex);
throw new IllegalStateException("Error while processing input.", ex);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ private static RowContainer initRowContainer(Configuration conf) {

container.setSerDe(serDe, oi);
container.setTableDesc(tableDesc);
} catch(Exception ex) {
} catch (Exception ex) {
throw new RuntimeException("Failed to create RowContainer", ex);
}
return container;
Expand Down Expand Up @@ -114,7 +114,7 @@ public synchronized void clear() {
}
try {
container.clearRows();
} catch(HiveException ex) {
} catch (HiveException ex) {
throw new RuntimeException("Failed to clear rows in RowContainer", ex);
}
cursor = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper;
import org.apache.hadoop.io.BytesWritable;

import scala.Tuple2;

import java.util.Iterator;
Expand All @@ -35,6 +36,7 @@ public HiveMapFunction(byte[] jobConfBuffer, SparkReporter sparkReporter) {
super(jobConfBuffer, sparkReporter);
}

@SuppressWarnings("unchecked")
@Override
public Iterable<Tuple2<HiveKey, BytesWritable>>
call(Iterator<Tuple2<BytesWritable, BytesWritable>> it) throws Exception {
Expand All @@ -50,7 +52,6 @@ public HiveMapFunction(byte[] jobConfBuffer, SparkReporter sparkReporter) {
}

HiveMapFunctionResultList result = new HiveMapFunctionResultList(jobConf, it, mapRecordHandler);
//TODO we need to implement a Spark specified Reporter to collect stats, refer to HIVE-7709.
mapRecordHandler.init(jobConf, result, sparkReporter);

return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.Reporter;
import scala.Tuple2;

import java.io.IOException;
Expand All @@ -32,6 +31,7 @@ public class HiveMapFunctionResultList extends
/**
* Instantiate result set Iterable for Map function output.
*
* @param conf Hive configuration.
* @param inputIterator Input record iterator.
* @param handler Initialized {@link SparkMapRecordHandler} instance.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@


public abstract class HivePairFlatMapFunction<T, K, V> implements PairFlatMapFunction<T, K, V> {
private static final NumberFormat taskIdFormat = NumberFormat.getInstance();
private static final NumberFormat stageIdFormat = NumberFormat.getInstance();
private static final NumberFormat TASK_ID_FORMAT = NumberFormat.getInstance();
private static final NumberFormat STAGE_ID_FORMAT = NumberFormat.getInstance();
static {
taskIdFormat.setGroupingUsed(false);
taskIdFormat.setMinimumIntegerDigits(6);
stageIdFormat.setGroupingUsed(false);
stageIdFormat.setMinimumIntegerDigits(4);
TASK_ID_FORMAT.setGroupingUsed(false);
TASK_ID_FORMAT.setMinimumIntegerDigits(6);
STAGE_ID_FORMAT.setGroupingUsed(false);
STAGE_ID_FORMAT.setMinimumIntegerDigits(4);
}

protected transient JobConf jobConf;
Expand Down Expand Up @@ -60,7 +60,7 @@ private void setupMRLegacyConfigs() {
StringBuilder taskAttemptIdBuilder = new StringBuilder("attempt_");
taskAttemptIdBuilder.append(System.currentTimeMillis())
.append("_")
.append(stageIdFormat.format(TaskContext.get().stageId()))
.append(STAGE_ID_FORMAT.format(TaskContext.get().stageId()))
.append("_");

if (isMap()) {
Expand All @@ -71,7 +71,7 @@ private void setupMRLegacyConfigs() {

// Spark task attempt id is increased by Spark context instead of task, which may introduce
// unstable qtest output, since non Hive features depends on this, we always set it to 0 here.
taskAttemptIdBuilder.append(taskIdFormat.format(TaskContext.get().partitionId()))
taskAttemptIdBuilder.append(TASK_ID_FORMAT.format(TaskContext.get().partitionId()))
.append("_0");

String taskAttemptIdStr = taskAttemptIdBuilder.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.io.BytesWritable;

import scala.Tuple2;

import java.util.Iterator;
Expand All @@ -33,6 +34,7 @@ public HiveReduceFunction(byte[] buffer, SparkReporter sparkReporter) {
super(buffer, sparkReporter);
}

@SuppressWarnings("unchecked")
@Override
public Iterable<Tuple2<HiveKey, BytesWritable>>
call(Iterator<Tuple2<HiveKey, Iterable<BytesWritable>>> it) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class HiveReduceFunctionResultList extends
/**
* Instantiate result set Iterable for Reduce function output.
*
* @param conf Hive configuration.
* @param inputIterator Input record iterator.
* @param reducer Initialized {@link org.apache.hadoop.hive.ql.exec.mr.ExecReducer} instance.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,15 @@ public interface HiveSparkClient extends Serializable, Closeable {
* @return SparkJobRef could be used to track spark job progress and metrics.
* @throws Exception
*/
public SparkJobRef execute(DriverContext driverContext, SparkWork sparkWork) throws Exception;
SparkJobRef execute(DriverContext driverContext, SparkWork sparkWork) throws Exception;

public SparkConf getSparkConf();
/**
* @return spark configuration
*/
SparkConf getSparkConf();

/**
* Get the count of executors
* @return the number of executors
*/
public int getExecutorCount() throws Exception;
int getExecutorCount() throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.spark.SparkException;

public class HiveSparkClientFactory {
protected static transient final Log LOG = LogFactory.getLog(HiveSparkClientFactory.class);
protected static final transient Log LOG = LogFactory.getLog(HiveSparkClientFactory.class);

private static final String SPARK_DEFAULT_CONF_FILE = "spark-defaults.conf";
private static final String SPARK_DEFAULT_MASTER = "local";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,8 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
import org.apache.hadoop.mapred.JobConf;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class LocalHiveSparkClient implements HiveSparkClient {
private static final long serialVersionUID = 1L;

private static final String MR_JAR_PROPERTY = "tmpjars";
protected static transient final Log LOG = LogFactory
protected static final transient Log LOG = LogFactory
.getLog(LocalHiveSparkClient.class);

private static final Splitter CSV_SPLITTER = Splitter.on(",").omitEmptyStrings();
Expand Down Expand Up @@ -138,7 +138,7 @@ public SparkJobRef execute(DriverContext driverContext, SparkWork sparkWork) thr
* At this point single SparkContext is used by more than one thread, so make this
* method synchronized.
*
* TODO: This method can't remove a jar/resource from SparkContext. Looks like this is an
* This method can't remove a jar/resource from SparkContext. Looks like this is an
* issue we have to live with until multiple SparkContexts are supported in a single JVM.
*/
private synchronized void refreshLocalResources(SparkWork sparkWork, HiveConf conf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ public class RemoteHiveSparkClient implements HiveSparkClient {
private static final long serialVersionUID = 1L;

private static final String MR_JAR_PROPERTY = "tmpjars";
protected static transient final Log LOG = LogFactory
protected static final transient Log LOG = LogFactory
.getLog(RemoteHiveSparkClient.class);

private static transient final Splitter CSV_SPLITTER = Splitter.on(",").omitEmptyStrings();
private static final transient Splitter CSV_SPLITTER = Splitter.on(",").omitEmptyStrings();

private transient SparkClient remoteClient;
private transient SparkConf sparkConf;
Expand Down
Loading

0 comments on commit 153e9ec

Please sign in to comment.