Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring the ParquetTools read/write APIs #5358

Merged
merged 19 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Working on code review comments
  • Loading branch information
malhotrashivam committed Apr 17, 2024
commit fdc5df27d755b1dd82db23b0bd9c784c931f6a8a
30 changes: 17 additions & 13 deletions Base/src/main/java/io/deephaven/base/FileUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,13 @@ public boolean accept(File dir, String name) {
};
private final static String[] EMPTY_STRING_ARRAY = new String[0];

public static final Pattern DUPLICATE_SLASH_PATTERN = Pattern.compile("//+");
public static final char URI_SEPARATOR_CHAR = '/';

public static final String URI_SEPARATOR = "" + URI_SEPARATOR_CHAR;

public static final String REPEATED_URI_SEPARATOR = URI_SEPARATOR + URI_SEPARATOR;

public static final Pattern REPEATED_URI_SEPARATOR_PATTERN = Pattern.compile("//+");

/**
* Cleans the specified path. All files and subdirectories in the path will be deleted. (ie you'll be left with an
Expand Down Expand Up @@ -256,8 +262,6 @@ public boolean accept(File pathname) {
}
}

public static final String URI_SEPARATOR = "/";

/**
* Take the file source path or URI string and convert it to a URI object. Any unnecessary path separators will be
* removed. The URI object will always be {@link URI#isAbsolute() absolute}, i.e., will always have a scheme.
Expand All @@ -275,8 +279,8 @@ public static URI convertToURI(final String source, final boolean isDirectory) {
uri = new URI(source);
// Replace two or more consecutive slashes in the path with a single slash
final String path = uri.getPath();
if (path.contains("//")) {
final String canonicalizedPath = DUPLICATE_SLASH_PATTERN.matcher(path).replaceAll("/");
if (path.contains(REPEATED_URI_SEPARATOR)) {
final String canonicalizedPath = REPEATED_URI_SEPARATOR_PATTERN.matcher(path).replaceAll(URI_SEPARATOR);
uri = new URI(uri.getScheme(), uri.getUserInfo(), uri.getHost(), uri.getPort(), canonicalizedPath,
uri.getQuery(), uri.getFragment());
}
Expand All @@ -302,17 +306,17 @@ public static URI convertToURI(final String source, final boolean isDirectory) {
*/
public static URI convertToURI(final File file, final boolean isDirectory) {
String absPath = file.getAbsolutePath();
if (File.separatorChar != '/') {
absPath = absPath.replace(File.separatorChar, '/');
if (File.separatorChar != URI_SEPARATOR_CHAR) {
absPath = absPath.replace(File.separatorChar, URI_SEPARATOR_CHAR);
}
if (absPath.charAt(0) != '/') {
absPath = "/" + absPath;
if (absPath.charAt(0) != URI_SEPARATOR_CHAR) {
absPath = URI_SEPARATOR_CHAR + absPath;
}
if (isDirectory && absPath.charAt(absPath.length() - 1) != '/') {
absPath = absPath + "/";
if (isDirectory && absPath.charAt(absPath.length() - 1) != URI_SEPARATOR_CHAR) {
absPath = absPath + URI_SEPARATOR_CHAR;
}
if (absPath.startsWith("//")) {
absPath = "//" + absPath;
if (absPath.startsWith(REPEATED_URI_SEPARATOR)) {
absPath = REPEATED_URI_SEPARATOR + absPath;
}
try {
return new URI("file", null, absPath, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;

import static io.deephaven.base.FileUtils.URI_SEPARATOR;
import static io.deephaven.base.FileUtils.URI_SEPARATOR_CHAR;

public final class ParquetUtils {

public static final String METADATA_FILE_NAME = "_metadata";
public static final String COMMON_METADATA_FILE_NAME = "_common_metadata";
public static final String PARQUET_FILE_EXTENSION = ".parquet";
public static final String METADATA_FILE_URI_SUFFIX = URI_SEPARATOR + METADATA_FILE_NAME;
public static final String COMMON_METADATA_FILE_URI_SUFFIX = URI_SEPARATOR + COMMON_METADATA_FILE_NAME;
public static final String METADATA_FILE_SUFFIX = File.separator + METADATA_FILE_NAME;
public static final String COMMON_METADATA_FILE_SUFFIX = File.separator + COMMON_METADATA_FILE_NAME;
public static final String METADATA_FILE_URI_SUFFIX = URI_SEPARATOR_CHAR + METADATA_FILE_NAME;
public static final String COMMON_METADATA_FILE_URI_SUFFIX = URI_SEPARATOR_CHAR + COMMON_METADATA_FILE_NAME;
public static final String METADATA_FILE_SUFFIX = File.separatorChar + METADATA_FILE_NAME;
public static final String COMMON_METADATA_FILE_SUFFIX = File.separatorChar + COMMON_METADATA_FILE_NAME;
private static final String MAGIC_STR = "PAR1";
public static final byte[] MAGIC = MAGIC_STR.getBytes(StandardCharsets.US_ASCII);

Expand All @@ -41,6 +41,20 @@ public static String getPerFileMetadataKey(final String filePath) {
return "deephaven_per_file_" + filePath.replace(File.separatorChar, '_');
}

/**
* This method verifies if the source points to a parquet file or a metadata file. Provided source can be a local
* file path or a URI. Also, it can point to a parquet file, metadata file or a directory.
*/
public static boolean isParquetFile(@NotNull final String source) {
boolean ret = source.endsWith(PARQUET_FILE_EXTENSION)
|| source.endsWith(METADATA_FILE_URI_SUFFIX)
|| source.endsWith(COMMON_METADATA_FILE_URI_SUFFIX);
if (File.separatorChar != URI_SEPARATOR_CHAR) {
ret = ret || source.endsWith(METADATA_FILE_SUFFIX) || source.endsWith(COMMON_METADATA_FILE_SUFFIX);
}
return ret;
}

/**
* Check if the provided path points to a non-hidden parquet file, and that none of its parents (till rootDir) are
* hidden.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,6 @@ public enum ParquetFileLayout {
static final String FILE_INDEX_TOKEN = "{i}";
private static final String DEFAULT_BASE_NAME_FOR_PARTITIONED_PARQUET_DATA = UUID_TOKEN;

private static final Optional<ParquetFileLayout> DEFAULT_FILE_LAYOUT = Optional.empty();
private static final Optional<TableDefinition> DEFAULT_TABLE_DEFINITION = Optional.empty();

public ParquetInstructions() {}

public final String getColumnNameFromParquetColumnNameOrDefault(final String parquetColumnName) {
Expand All @@ -173,12 +170,6 @@ public final String getColumnNameFromParquetColumnNameOrDefault(final String par
@Override
public abstract String getCodecArgs(final String columnName);

@Nullable
abstract KeyedObjectHashMap<String, ColumnInstructions> getColumnNameToInstructionsMap();

@Nullable
abstract KeyedObjectHashMap<String, ColumnInstructions> getParquetColumnNameToInstructionsMap();

/**
* @return A hint that the writer should use dictionary-based encoding for writing this column; never evaluated for
* non-String columns, defaults to false
Expand Down Expand Up @@ -222,15 +213,17 @@ public final String getColumnNameFromParquetColumnNameOrDefault(final String par
public abstract Optional<TableDefinition> getTableDefinition();

/**
* Creates a new {@link ParquetInstructions} object with the same properties as the current object and definition
* Creates a new {@link ParquetInstructions} object with the same properties as the current object but definition
* set as the provided {@link TableDefinition}.
*/
final ParquetInstructions withTableDefinition(final TableDefinition tableDefinition) {
return new ReadOnly(getColumnNameToInstructionsMap(), getParquetColumnNameToInstructionsMap(),
getCompressionCodecName(), getMaximumDictionaryKeys(), getMaximumDictionarySize(), isLegacyParquet(),
getTargetPageSize(), isRefreshing(), getSpecialInstructions(), generateMetadataFiles(),
baseNameForPartitionedParquetData(), getFileLayout(), Optional.of(tableDefinition));
}
public abstract ParquetInstructions withTableDefinition(final TableDefinition tableDefinition);

/**
* Creates a new {@link ParquetInstructions} object with the same properties as the current object but definition
* and layout set as the provided values.
*/
public abstract ParquetInstructions withTableDefinitionAndLayout(final TableDefinition tableDefinition,
final ParquetFileLayout fileLayout);

/**
* @return the base name for partitioned parquet data. Check
Expand Down Expand Up @@ -282,18 +275,6 @@ public boolean useDictionary(final String columnName) {
return false;
}

@Override
@Nullable
KeyedObjectHashMap<String, ColumnInstructions> getColumnNameToInstructionsMap() {
return null;
}

@Override
@Nullable
KeyedObjectHashMap<String, ColumnInstructions> getParquetColumnNameToInstructionsMap() {
return null;
}

@Override
@Nullable
public Object getSpecialInstructions() {
Expand Down Expand Up @@ -342,12 +323,27 @@ public String baseNameForPartitionedParquetData() {

@Override
public Optional<ParquetFileLayout> getFileLayout() {
return DEFAULT_FILE_LAYOUT;
return Optional.empty();
}

@Override
public Optional<TableDefinition> getTableDefinition() {
return DEFAULT_TABLE_DEFINITION;
return Optional.empty();
}

@Override
public ParquetInstructions withTableDefinition(@Nullable final TableDefinition tableDefinition) {
return withTableDefinitionAndLayout(tableDefinition, null);
}

@Override
public ParquetInstructions withTableDefinitionAndLayout(
@Nullable final TableDefinition tableDefinition,
@Nullable final ParquetFileLayout fileLayout) {
return new ReadOnly(null, null, getCompressionCodecName(), getMaximumDictionaryKeys(),
getMaximumDictionarySize(), isLegacyParquet(), getTargetPageSize(), isRefreshing(),
getSpecialInstructions(), generateMetadataFiles(), baseNameForPartitionedParquetData(),
Optional.ofNullable(fileLayout), Optional.ofNullable(tableDefinition));
}
};

Expand Down Expand Up @@ -507,16 +503,6 @@ public boolean useDictionary(final String columnName) {
return getOrDefault(columnName, false, ColumnInstructions::useDictionary);
}

@Override
KeyedObjectHashMap<String, ColumnInstructions> getColumnNameToInstructionsMap() {
return columnNameToInstructions;
}

@Override
KeyedObjectHashMap<String, ColumnInstructions> getParquetColumnNameToInstructionsMap() {
return parquetColumnNameToInstructions;
}

@Override
public String getCompressionCodecName() {
return compressionCodecName;
Expand Down Expand Up @@ -573,6 +559,22 @@ public Optional<TableDefinition> getTableDefinition() {
return tableDefinition;
}

@Override
public ParquetInstructions withTableDefinition(@Nullable final TableDefinition useDefinition) {
return withTableDefinitionAndLayout(useDefinition, getFileLayout().orElse(null));
}

@Override
public ParquetInstructions withTableDefinitionAndLayout(
@Nullable final TableDefinition useDefinition,
@Nullable final ParquetFileLayout useLayout) {
return new ReadOnly(columnNameToInstructions, parquetColumnNameToInstructions,
getCompressionCodecName(), getMaximumDictionaryKeys(), getMaximumDictionarySize(),
isLegacyParquet(), getTargetPageSize(), isRefreshing(), getSpecialInstructions(),
generateMetadataFiles(), baseNameForPartitionedParquetData(), Optional.ofNullable(useLayout),
Optional.ofNullable(useDefinition));
}

KeyedObjectHashMap<String, ColumnInstructions> copyColumnNameToInstructions() {
// noinspection unchecked
return (columnNameToInstructions == null)
Expand Down Expand Up @@ -626,8 +628,8 @@ public static class Builder {
private Object specialInstructions;
private boolean generateMetadataFiles = DEFAULT_GENERATE_METADATA_FILES;
private String baseNameForPartitionedParquetData = DEFAULT_BASE_NAME_FOR_PARTITIONED_PARQUET_DATA;
private Optional<ParquetFileLayout> fileLayout = DEFAULT_FILE_LAYOUT;
private Optional<TableDefinition> tableDefinition = DEFAULT_TABLE_DEFINITION;
private Optional<ParquetFileLayout> fileLayout = Optional.empty();
private Optional<TableDefinition> tableDefinition = Optional.empty();

public Builder() {}

Expand Down
Loading
Loading