Skip to content

Commit

Permalink
HIVE-25958: Optimise BasicStatsNoJobTask. (apache#3037). (Ayush Saxen…
Browse files Browse the repository at this point in the history
…a, reviewed by Rajesh Balamohan)
  • Loading branch information
ayushtkn committed Feb 23, 2022
1 parent 1c3ecf9 commit c97a491
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 17 deletions.
4 changes: 4 additions & 0 deletions common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,10 @@ public static enum ConfVars {
"Comma-separated list of statistics publishers to be invoked on counters on each job. \n" +
"A client stats publisher is specified as the name of a Java class which implements the \n" +
"org.apache.hadoop.hive.ql.stats.ClientStatsPublisher interface."),
BASICSTATSTASKSMAXTHREADSFACTOR("hive.basic.stats.max.threads.factor", 2, "Determines the maximum number of "
+ "threads that can be used for collection of file level statistics. If the value configured is x, then the "
+ "maximum number of threads that can be used is x multiplied by the number of available processors. A value"
+ " of less than 1, makes stats collection sequential."),
EXECPARALLEL("hive.exec.parallel", false, "Whether to execute jobs in parallel"),
EXECPARALLETHREADNUMBER("hive.exec.parallel.thread.number", 8,
"How many jobs at most can be executed in parallel"),
Expand Down
135 changes: 118 additions & 17 deletions ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsNoJobTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,12 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -65,6 +70,7 @@
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimaps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

/**
* StatsNoJobTask is used in cases where stats collection is the only task for the given query (no
Expand Down Expand Up @@ -223,6 +229,18 @@ public void run() {
} else {
fileList = HiveStatsUtils.getFileStatusRecurse(dir, -1, fs);
}
ThreadPoolExecutor tpE = null;
List<Future<FileStats>> futures = null;
int numThreadsFactor = HiveConf.getIntVar(jc, HiveConf.ConfVars.BASICSTATSTASKSMAXTHREADSFACTOR);
if (fileList.size() > 1 && numThreadsFactor > 0) {
int numThreads = Math.min(fileList.size(), numThreadsFactor * Runtime.getRuntime().availableProcessors());
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("Basic-Stats-Thread-%d").build();
tpE = new ThreadPoolExecutor(numThreads, numThreads, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
threadFactory);
tpE.allowsCoreThreadTimeOut();
futures = new ArrayList<>();
LOG.info("Processing Stats for {} file using {} threads", fileList.size(), numThreads);
}

for (FileStatus file : fileList) {
Utilities.FILE_OP_LOGGER.debug("Computing stats for {}", file);
Expand All @@ -232,28 +250,41 @@ public void run() {
if (file.getLen() == 0) {
numFiles += 1;
} else {
org.apache.hadoop.mapred.RecordReader<?, ?> recordReader = inputFormat.getRecordReader(dummySplit, jc, Reporter.NULL);
try {
if (recordReader instanceof StatsProvidingRecordReader) {
StatsProvidingRecordReader statsRR;
statsRR = (StatsProvidingRecordReader) recordReader;
rawDataSize += statsRR.getStats().getRawDataSize();
numRows += statsRR.getStats().getRowCount();
fileSize += file.getLen();
numFiles += 1;
if (file.isErasureCoded()) {
numErasureCodedFiles++;
}
} else {
throw new HiveException(String.format("Unexpected file found during reading footers for: %s ", file));
}
} finally {
recordReader.close();
FileStatProcessor fsp = new FileStatProcessor(file, inputFormat, dummySplit, jc);
if (tpE != null) {
futures.add(tpE.submit(fsp));
} else {
// No parallel processing, just call the method normally & update the stats.
FileStats fileStat = fsp.call();
rawDataSize += fileStat.getRawDataSize();
numRows += fileStat.getNumRows();
fileSize += fileStat.getFileSize();
numFiles += 1;
numErasureCodedFiles += fileStat.getNumErasureCodedFiles();
}
}
}
}

if (tpE != null) {
try {
for (Future<FileStats> future : futures) {
FileStats fileStat = future.get();
rawDataSize += fileStat.getRawDataSize();
numRows += fileStat.getNumRows();
fileSize += fileStat.getFileSize();
numFiles += 1;
numErasureCodedFiles += fileStat.getNumErasureCodedFiles();
}
} catch (Exception e) {
LOG.error("Encountered exception while collecting stats for file lists as {}", fileList, e);
// Cancel all the futures in the list & throw the caught exception post that.
futures.forEach(x -> x.cancel(true));
throw e;
} finally {
tpE.shutdown();
}
}
StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.TRUE);

parameters.put(StatsSetupConst.ROW_COUNT, String.valueOf(numRows));
Expand Down Expand Up @@ -446,4 +477,74 @@ private void shutdownAndAwaitTermination(ExecutorService threadPool) {
@Override
public void setDpPartSpecs(Collection<Partition> dpPartSpecs) {
}

/**
* Utility class to process file level stats in parallel.
*/
private static class FileStatProcessor implements Callable <FileStats> {

private final InputSplit dummySplit;
private final InputFormat<?, ?> inputFormat;
private final JobConf jc;
private final FileStatus file;

FileStatProcessor(FileStatus file, InputFormat<?, ?> inputFormat, InputSplit dummySplit, JobConf jc) {
this.file = file;
this.dummySplit = dummySplit;
this.inputFormat = inputFormat;
this.jc = jc;
}

@Override
public FileStats call() throws Exception {
try (org.apache.hadoop.mapred.RecordReader<?, ?> recordReader = inputFormat
.getRecordReader(dummySplit, jc, Reporter.NULL)) {
if (recordReader instanceof StatsProvidingRecordReader) {
StatsProvidingRecordReader statsRR;
statsRR = (StatsProvidingRecordReader) recordReader;
final FileStats fileStats =
new FileStats(statsRR.getStats().getRawDataSize(), statsRR.getStats().getRowCount(), file.getLen(),
file.isErasureCoded());
return fileStats;
} else {
throw new HiveException(String.format("Unexpected file found during reading footers for: %s ", file));
}
}
}
}

/**
* Utility class for holding the file level statistics.
*/
private static class FileStats {

private long numRows = 0;
private long rawDataSize = 0;
private long fileSize = 0;
private long numErasureCodedFiles = 0;

public FileStats(long rawDataSize, long numRows, long fileSize, boolean isErasureCoded) {
this.rawDataSize = rawDataSize;
this.numRows = numRows;
this.fileSize = fileSize;
this.numErasureCodedFiles = isErasureCoded ? 1 : 0;
}

public long getNumRows() {
return numRows;
}

public long getRawDataSize() {
return rawDataSize;
}

public long getFileSize() {
return fileSize;
}

public long getNumErasureCodedFiles() {
return numErasureCodedFiles;
}

}
}

0 comments on commit c97a491

Please sign in to comment.