Skip to content

Commit

Permalink
HIVE-25967: Prevent residual expressions from getting serialized in I…
Browse files Browse the repository at this point in the history
…ceberg splits (apache#3041) (Adam Szita, reviewed by Peter Vary)
  • Loading branch information
szlta committed Feb 22, 2022
1 parent 247d8aa commit 1aa6ce8
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
import org.apache.hadoop.hive.ql.exec.tez.HashableInputSplit;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.common.DynClasses;
import org.apache.iceberg.common.DynFields;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.mr.mapreduce.IcebergSplit;
import org.apache.iceberg.mr.mapreduce.IcebergSplitContainer;
import org.apache.iceberg.relocated.com.google.common.primitives.Longs;
Expand Down Expand Up @@ -93,8 +97,46 @@ public long getStart() {
return 0;
}

/**
* This hack removes residual expressions from the file scan task just before split serialization.
* Residuals can sometime take up too much space in the payload causing Tez AM to OOM.
* Unfortunately Tez AM doesn't distribute splits in a streamed way, that is, it serializes all splits for a job
* before sending them out to executors. Some residuals may take ~ 1 MB in memory, multiplied with thousands of splits
* could kill the Tez AM JVM.
* Until the streamed split distribution is implemented we will kick residuals out of the split, essentially the
* executor side won't use it anyway (yet).
*/
private static final Class<?> SPLIT_SCAN_TASK_CLAZZ;
private static final DynFields.UnboundField<Object> FILE_SCAN_TASK_FIELD;
private static final DynFields.UnboundField<Object> RESIDUALS_FIELD;
private static final DynFields.UnboundField<Object> EXPR_FIELD;
private static final DynFields.UnboundField<Object> UNPARTITIONED_EXPR_FIELD;

static {
SPLIT_SCAN_TASK_CLAZZ = DynClasses.builder().impl("org.apache.iceberg.BaseFileScanTask$SplitScanTask").build();
FILE_SCAN_TASK_FIELD = DynFields.builder().hiddenImpl(SPLIT_SCAN_TASK_CLAZZ, "fileScanTask").build();
RESIDUALS_FIELD = DynFields.builder().hiddenImpl("org.apache.iceberg.BaseFileScanTask", "residuals").build();
EXPR_FIELD = DynFields.builder().hiddenImpl(ResidualEvaluator.class, "expr").build();
UNPARTITIONED_EXPR_FIELD = DynFields.builder().hiddenImpl("org.apache.iceberg.expressions." +
"ResidualEvaluator$UnpartitionedResidualEvaluator", "expr").build();
}

@Override
public void write(DataOutput out) throws IOException {
for (FileScanTask fileScanTask : icebergSplit().task().files()) {
if (fileScanTask.residual() != Expressions.alwaysTrue() &&
fileScanTask.getClass().isAssignableFrom(SPLIT_SCAN_TASK_CLAZZ)) {

Object residuals = RESIDUALS_FIELD.get(FILE_SCAN_TASK_FIELD.get(fileScanTask));

if (fileScanTask.spec().isPartitioned()) {
EXPR_FIELD.set(residuals, Expressions.alwaysTrue());
} else {
UNPARTITIONED_EXPR_FIELD.set(residuals, Expressions.alwaysTrue());
}

}
}
byte[] bytes = SerializationUtil.serializeToBytes(tableLocation);
out.writeInt(bytes.length);
out.write(bytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.iceberg.data.DeleteReadTests;
import org.apache.iceberg.data.InternalRecordWrapper;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.mr.hive.TestIcebergInputFormats;
import org.apache.iceberg.util.StructLikeSet;
import org.junit.Assert;
import org.junit.Before;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@
* under the License.
*/

package org.apache.iceberg.mr;
package org.apache.iceberg.mr.hive;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
Expand Down Expand Up @@ -56,7 +60,9 @@
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.mr.hive.HiveIcebergInputFormat;
import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.mr.TestHelper;
import org.apache.iceberg.mr.mapred.Container;
import org.apache.iceberg.mr.mapred.MapredIcebergInputFormat;
import org.apache.iceberg.mr.mapreduce.IcebergInputFormat;
Expand All @@ -75,7 +81,9 @@
import org.junit.runners.Parameterized;

import static org.apache.iceberg.types.Types.NestedField.required;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;

@RunWith(Parameterized.class)
Expand Down Expand Up @@ -401,6 +409,41 @@ public void testDeriveLlapSetsCacheAffinityForIcebergInputFormat() {
mapWork.getCacheAffinity());
}

@Test
public void testResidualsUnserialized() throws Exception {
helper.createUnpartitionedTable();
List<Record> expectedRecords = helper.generateRandomRecords(10, 0L);
helper.appendToTable(null, expectedRecords);
builder.filter(Expressions.greaterThan("id", 123));

for (InputSplit split : testInputFormat.create(builder.conf()).getSplits()) {

HiveIcebergSplit originalSplit = new HiveIcebergSplit((IcebergSplit) split, "noop");

// In the original split, residual should still be there as per above expression
assertNotEquals(
Expressions.alwaysTrue(),
originalSplit.icebergSplit().task().files().stream().findFirst().get().residual()
);

ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(baos);
originalSplit.write(out);

HiveIcebergSplit deserializedSplit = new HiveIcebergSplit();
ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
DataInputStream in = new DataInputStream(bais);
deserializedSplit.readFields(in);

// After ser/de the expression should be always-true
assertEquals(
Expressions.alwaysTrue(),
deserializedSplit.icebergSplit().task().files().stream().findFirst().get().residual()
);
}

}

// TODO - Capture template type T in toString method: https://github.com/apache/iceberg/issues/1542
public abstract static class TestInputFormat<T> {

Expand Down

0 comments on commit 1aa6ce8

Please sign in to comment.