Skip to content

Commit

Permalink
HIVE-6910 : Invalid column access info for partitioned table (Navis v…
Browse files Browse the repository at this point in the history
…ia Ashutosh Chauhan)

git-svn-id: https://svn.apache.org/repos/asf/hive/trunk@1597694 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
ashutoshc committed May 27, 2014
1 parent 4087a24 commit d20e4e1
Show file tree
Hide file tree
Showing 56 changed files with 1,854 additions and 1,118 deletions.
28 changes: 13 additions & 15 deletions ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
Original file line number Diff line number Diff line change
Expand Up @@ -243,31 +243,28 @@ static public String getOperatorName() {
return "TS";
}

// This 'neededColumnIDs' field is included in this operator class instead of
// its desc class.The reason is that 1)tableScanDesc can not be instantiated,
// and 2) it will fail some join and union queries if this is added forcibly
// into tableScanDesc.
// Both neededColumnIDs and neededColumns should never be null.
// When neededColumnIDs is an empty list,
// it means no needed column (e.g. we do not need any column to evaluate
// SELECT count(*) FROM t).
List<Integer> neededColumnIDs;
List<String> neededColumns;

public void setNeededColumnIDs(List<Integer> orign_columns) {
neededColumnIDs = orign_columns;
conf.setNeededColumnIDs(orign_columns);
}

public List<Integer> getNeededColumnIDs() {
return neededColumnIDs;
return conf.getNeededColumnIDs();
}

public void setNeededColumns(List<String> columnNames) {
neededColumns = columnNames;
conf.setNeededColumns(columnNames);
}

public List<String> getNeededColumns() {
return neededColumns;
return conf.getNeededColumns();
}

public void setReferencedColumns(List<String> referencedColumns) {
conf.setReferencedColumns(referencedColumns);
}

public List<String> getReferencedColumns() {
return conf.getReferencedColumns();
}

@Override
Expand Down Expand Up @@ -335,6 +332,7 @@ public Operator<? extends OperatorDesc> clone()
TableScanOperator ts = (TableScanOperator) super.clone();
ts.setNeededColumnIDs(new ArrayList<Integer>(getNeededColumnIDs()));
ts.setNeededColumns(new ArrayList<String>(getNeededColumns()));
ts.setReferencedColumns(new ArrayList<String>(getReferencedColumns()));
return ts;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
cols);
List<Integer> neededColumnIds = new ArrayList<Integer>();
List<String> neededColumnNames = new ArrayList<String>();
List<String> referencedColumnNames = new ArrayList<String>();
RowResolver inputRR = cppCtx.getOpToParseCtxMap().get(scanOp).getRowResolver();
TableScanDesc desc = scanOp.getConf();
List<VirtualColumn> virtualCols = desc.getVirtualCols();
Expand All @@ -322,11 +323,12 @@ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
cols.add(VirtualColumn.RAWDATASIZE.getName());
}

for (int i = 0; i < cols.size(); i++) {
String[] tabCol = inputRR.reverseLookup(cols.get(i));
if(tabCol == null) {
for (String column : cols) {
String[] tabCol = inputRR.reverseLookup(column);
if (tabCol == null) {
continue;
}
referencedColumnNames.add(column);
ColumnInfo colInfo = inputRR.get(tabCol[0], tabCol[1]);
if (colInfo.getIsVirtualCol()) {
// part is also a virtual column, but part col should not in this
Expand All @@ -340,17 +342,18 @@ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
//no need to pass virtual columns to reader.
continue;
}
int position = inputRR.getPosition(cols.get(i));
int position = inputRR.getPosition(column);
if (position >= 0) {
// get the needed columns by id and name
neededColumnIds.add(position);
neededColumnNames.add(cols.get(i));
neededColumnNames.add(column);
}
}

desc.setVirtualCols(newVirtualCols);
scanOp.setNeededColumnIDs(neededColumnIds);
scanOp.setNeededColumns(neededColumnNames);
scanOp.setReferencedColumns(referencedColumnNames);
return null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx opProcCtx,
.getConfirmedPartitionsForScan(parseInfo);
if (confirmedPartns.size() > 0) {
Table source = parseCtx.getQB().getMetaData().getTableForAlias(alias);
PrunedPartitionList partList = new PrunedPartitionList(source, confirmedPartns, false);
List<String> partCols = GenMapRedUtils.getPartitionColumns(parseInfo);
PrunedPartitionList partList = new PrunedPartitionList(source, confirmedPartns, partCols, false);
GenMapRedUtils.setTaskPlan(currAliasId, currTopOp, currTask, false, ctx, partList);
} else { // non-partitioned table
GenMapRedUtils.setTaskPlan(currAliasId, currTopOp, currTask, false, ctx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -927,6 +928,7 @@ public static TableScanOperator createTemporaryTableScanOperator(RowSchema rowSc
}
tableScanOp.setNeededColumnIDs(neededColumnIds);
tableScanOp.setNeededColumns(neededColumnNames);
tableScanOp.setReferencedColumns(neededColumnNames);
return tableScanOp;
}

Expand Down Expand Up @@ -1747,6 +1749,14 @@ public static Set<Partition> getConfirmedPartitionsForScan(QBParseInfo parseInfo
return confirmedPartns;
}

public static List<String> getPartitionColumns(QBParseInfo parseInfo) {
tableSpec tblSpec = parseInfo.getTableSpec();
if (tblSpec.tableHandle.isPartitioned()) {
return new ArrayList<String>(tblSpec.getPartSpec().keySet());
}
return Collections.emptyList();
}

public static List<Path> getInputPathsForPartialScan(QBParseInfo parseInfo, StringBuffer aggregationKey)
throws SemanticException {
List<Path> inputPaths = new ArrayList<Path>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,9 @@ static private ExprNodeDesc compactExpr(ExprNodeDesc expr) {
return isAnd ? children.get(0) : null;
}
}
return (ExprNodeGenericFuncDesc)expr;
return expr;
} else {
throw new IllegalStateException("Unexpected type of ExprNodeDesc: " + expr.getExprString());
throw new IllegalStateException("Unexpected type of ExprNodeDesc: " + expr.getExprString());
}
}

Expand All @@ -225,18 +225,23 @@ static private ExprNodeDesc compactExpr(ExprNodeDesc expr) {
* The expression is only used to prune by partition name, so we have no business with VCs.
* @param expr original partition pruning expression.
* @param partCols list of partition columns for the table.
* @param referred partition columns referred by expr
* @return partition pruning expression that only contains partition columns from the list.
*/
static private ExprNodeDesc removeNonPartCols(ExprNodeDesc expr, List<String> partCols) {
if (expr instanceof ExprNodeColumnDesc
&& !partCols.contains(((ExprNodeColumnDesc) expr).getColumn())) {
// Column doesn't appear to be a partition column for the table.
return new ExprNodeConstantDesc(expr.getTypeInfo(), null);
static private ExprNodeDesc removeNonPartCols(ExprNodeDesc expr, List<String> partCols,
Set<String> referred) {
if (expr instanceof ExprNodeColumnDesc) {
String column = ((ExprNodeColumnDesc) expr).getColumn();
if (!partCols.contains(column)) {
// Column doesn't appear to be a partition column for the table.
return new ExprNodeConstantDesc(expr.getTypeInfo(), null);
}
referred.add(column);
}
if (expr instanceof ExprNodeGenericFuncDesc) {
List<ExprNodeDesc> children = expr.getChildren();
for (int i = 0; i < children.size(); ++i) {
children.set(i, removeNonPartCols(children.get(i), partCols));
children.set(i, removeNonPartCols(children.get(i), partCols, referred));
}
}
return expr;
Expand Down Expand Up @@ -266,7 +271,7 @@ private static PrunedPartitionList getPartitionsFromServer(Table tab,
try {
if (!tab.isPartitioned()) {
// If the table is not partitioned, return everything.
return new PrunedPartitionList(tab, getAllPartitions(tab), false);
return new PrunedPartitionList(tab, getAllPartitions(tab), null, false);
}
LOG.debug("tabname = " + tab.getTableName() + " is partitioned");

Expand All @@ -279,18 +284,19 @@ private static PrunedPartitionList getPartitionsFromServer(Table tab,

if (prunerExpr == null) {
// Non-strict mode, and there is no predicates at all - get everything.
return new PrunedPartitionList(tab, getAllPartitions(tab), false);
return new PrunedPartitionList(tab, getAllPartitions(tab), null, false);
}

Set<String> referred = new LinkedHashSet<String>();
// Replace virtual columns with nulls. See javadoc for details.
prunerExpr = removeNonPartCols(prunerExpr, extractPartColNames(tab));
prunerExpr = removeNonPartCols(prunerExpr, extractPartColNames(tab), referred);
// Remove all parts that are not partition columns. See javadoc for details.
ExprNodeGenericFuncDesc compactExpr = (ExprNodeGenericFuncDesc)compactExpr(prunerExpr.clone());
String oldFilter = prunerExpr.getExprString();
if (compactExpr == null) {
// Non-strict mode, and all the predicates are on non-partition columns - get everything.
LOG.debug("Filter " + oldFilter + " was null after compacting");
return new PrunedPartitionList(tab, getAllPartitions(tab), true);
return new PrunedPartitionList(tab, getAllPartitions(tab), null, true);
}

LOG.debug("Filter w/ compacting: " + compactExpr.getExprString()
Expand Down Expand Up @@ -326,6 +332,7 @@ private static PrunedPartitionList getPartitionsFromServer(Table tab,
// metastore and so some partitions may have no data based on other filters.
boolean isPruningByExactFilter = oldFilter.equals(compactExpr.getExprString());
return new PrunedPartitionList(tab, new LinkedHashSet<Partition>(partitions),
new ArrayList<String>(referred),
hasUnknownPartitions || !isPruningByExactFilter);
} catch (HiveException e) {
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Table;

public class ColumnAccessAnalyzer {
Expand All @@ -44,9 +44,23 @@ public ColumnAccessInfo analyzeColumnAccess() throws SemanticException {
for (TableScanOperator op : topOps.keySet()) {
Table table = topOps.get(op);
String tableName = table.getCompleteName();
List<FieldSchema> tableCols = table.getCols();
for (int i : op.getNeededColumnIDs()) {
columnAccessInfo.add(tableName, tableCols.get(i).getName());
List<String> referenced = op.getReferencedColumns();
for (String column : referenced) {
columnAccessInfo.add(tableName, column);
}
if (table.isPartitioned()) {
PrunedPartitionList parts;
try {
parts = pGraphContext.getPrunedPartitions(table.getTableName(), op);
} catch (HiveException e) {
LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e));
throw new SemanticException(e.getMessage(), e);
}
if (parts.getReferredPartCols() != null) {
for (String partKey : parts.getReferredPartCols()) {
columnAccessInfo.add(tableName, partKey);
}
}
}
}
return columnAccessInfo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
Expand All @@ -40,12 +39,6 @@
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.tableSpec;
import org.apache.hadoop.hive.ql.parse.GenTezWork;
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
import org.apache.hadoop.hive.ql.parse.QBParseInfo;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.StatsNoJobWork;
import org.apache.hadoop.hive.ql.plan.TezWork;
Expand Down Expand Up @@ -150,7 +143,8 @@ public Object process(Node nd, Stack<Node> stack,
PrunedPartitionList partitions = null;
if (confirmedPartns.size() > 0) {
Table source = queryBlock.getMetaData().getTableForAlias(alias);
partitions = new PrunedPartitionList(source, confirmedPartns, false);
List<String> partCols = GenMapRedUtils.getPartitionColumns(parseInfo);
partitions = new PrunedPartitionList(source, confirmedPartns, partCols, false);
}

MapWork w = utils.createMapWork(context, tableScan, tezWork, partitions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,16 @@ public class PrunedPartitionList {
/** Partitions that either satisfy the partition criteria, or may satisfy it. */
private Set<Partition> partitions;

/** partition columns referred by pruner expr */
private List<String> referred;

/** Whether there are partitions in the list that may or may not satisfy the criteria. */
private boolean hasUnknowns;

public PrunedPartitionList(Table source, Set<Partition> partitions, boolean hasUnknowns) {
public PrunedPartitionList(Table source, Set<Partition> partitions, List<String> referred,
boolean hasUnknowns) {
this.source = source;
this.referred = referred;
this.partitions = partitions;
this.hasUnknowns = hasUnknowns;
}
Expand Down Expand Up @@ -70,4 +75,8 @@ public List<Partition> getNotDeniedPartns() {
public boolean hasUnknownPartitions() {
return hasUnknowns;
}

public List<String> getReferredPartCols() {
return referred;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ public tableSpec getTableSpec(String tName) {
}

/**
* This method is used only for the anlayze command to get the partition specs
* This method is used only for the analyze command to get the partition specs
*/
public tableSpec getTableSpec() {

Expand Down
39 changes: 39 additions & 0 deletions ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;

import org.apache.hadoop.hive.ql.exec.PTFUtils;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;

/**
Expand All @@ -34,6 +35,10 @@
public class TableScanDesc extends AbstractOperatorDesc {
private static final long serialVersionUID = 1L;

static {
PTFUtils.makeTransient(TableScanDesc.class, "filterObject", "referencedColumns");
}

private String alias;

private List<VirtualColumn> virtualCols;
Expand Down Expand Up @@ -64,6 +69,16 @@ public class TableScanDesc extends AbstractOperatorDesc {
private ExprNodeGenericFuncDesc filterExpr;
private transient Serializable filterObject;

// Both neededColumnIDs and neededColumns should never be null.
// When neededColumnIDs is an empty list,
// it means no needed column (e.g. we do not need any column to evaluate
// SELECT count(*) FROM t).
private List<Integer> neededColumnIDs;
private List<String> neededColumns;

// all column names referenced including virtual columns. used in ColumnAccessAnalyzer
private transient List<String> referencedColumns;

public static final String FILTER_EXPR_CONF_STR =
"hive.io.filter.expr.serialized";

Expand Down Expand Up @@ -125,6 +140,30 @@ public void setFilterObject(Serializable filterObject) {
this.filterObject = filterObject;
}

public void setNeededColumnIDs(List<Integer> neededColumnIDs) {
this.neededColumnIDs = neededColumnIDs;
}

public List<Integer> getNeededColumnIDs() {
return neededColumnIDs;
}

public void setNeededColumns(List<String> neededColumns) {
this.neededColumns = neededColumns;
}

public List<String> getNeededColumns() {
return neededColumns;
}

public void setReferencedColumns(List<String> referencedColumns) {
this.referencedColumns = referencedColumns;
}

public List<String> getReferencedColumns() {
return referencedColumns;
}

public void setAlias(String alias) {
this.alias = alias;
}
Expand Down
5 changes: 5 additions & 0 deletions ql/src/test/queries/clientpositive/column_access_stats.q
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,8 @@ FROM
JOIN T3
ON T3.key = T4.key
ORDER BY T3.key, T4.key;

-- for partitioned table
SELECT * FROM srcpart TABLESAMPLE (10 ROWS);
SELECT key,ds FROM srcpart TABLESAMPLE (10 ROWS) WHERE hr='11';
SELECT value FROM srcpart TABLESAMPLE (10 ROWS) WHERE ds='2008-04-08';
Loading

0 comments on commit d20e4e1

Please sign in to comment.