Skip to content
This repository has been archived by the owner on Nov 16, 2022. It is now read-only.

Commit

Permalink
Pass file size to Iceberg commit task
Browse files Browse the repository at this point in the history
  • Loading branch information
jackye1995 authored and zhenxiao committed Dec 28, 2021
1 parent a3b7569 commit 1d71890
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,19 @@
public class CommitTaskData
{
private final String path;
private final long fileSizeInBytes;
private final MetricsWrapper metrics;
private final Optional<String> partitionDataJson;

@JsonCreator
public CommitTaskData(
@JsonProperty("path") String path,
@JsonProperty("fileSizeInBytes") long fileSizeInBytes,
@JsonProperty("metrics") MetricsWrapper metrics,
@JsonProperty("partitionDataJson") Optional<String> partitionDataJson)
{
this.path = requireNonNull(path, "path is null");
this.fileSizeInBytes = fileSizeInBytes;
this.metrics = requireNonNull(metrics, "metrics is null");
this.partitionDataJson = requireNonNull(partitionDataJson, "partitionDataJson is null");
}
Expand All @@ -43,6 +46,12 @@ public String getPath()
return path;
}

@JsonProperty
public long getFileSizeInBytes()
{
return fileSizeInBytes;
}

@JsonProperty
public MetricsWrapper getMetrics()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,10 +447,9 @@ public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session,

AppendFiles appendFiles = transaction.newFastAppend();
for (CommitTaskData task : commitTasks) {
HdfsContext context = new HdfsContext(session, table.getSchemaName(), table.getTableName());

DataFiles.Builder builder = DataFiles.builder(icebergTable.spec())
.withInputFile(new HdfsInputFile(new Path(task.getPath()), hdfsEnvironment, context))
.withPath(task.getPath())
.withFileSizeInBytes(task.getFileSizeInBytes())
.withFormat(table.getFileFormat())
.withMetrics(task.getMetrics().metrics());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -411,10 +410,10 @@ public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session,
.toArray(Type[]::new);

AppendFiles appendFiles = transaction.newFastAppend();
FileIO io = transaction.table().io();
for (CommitTaskData task : commitTasks) {
DataFiles.Builder builder = DataFiles.builder(icebergTable.spec())
.withInputFile(io.newInputFile(task.getPath()))
.withPath(task.getPath())
.withFileSizeInBytes(task.getFileSizeInBytes())
.withFormat(table.getFileFormat())
.withMetrics(task.getMetrics().metrics());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ public CompletableFuture<Collection<Slice>> finish()

CommitTaskData task = new CommitTaskData(
context.getPath().toString(),
context.writer.getFileSizeInBytes(),
new MetricsWrapper(context.writer.getMetrics()),
context.getPartitionData().map(PartitionData::toJson));

Expand Down

0 comments on commit 1d71890

Please sign in to comment.