Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring the ParquetTools read/write APIs #5358

Merged
merged 19 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 18 additions & 12 deletions Base/src/main/java/io/deephaven/base/FileUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,13 @@ public boolean accept(File dir, String name) {
};
private final static String[] EMPTY_STRING_ARRAY = new String[0];

public static final Pattern DUPLICATE_SLASH_PATTERN = Pattern.compile("//+");
public static final char URI_SEPARATOR_CHAR = '/';

public static final String URI_SEPARATOR = "" + URI_SEPARATOR_CHAR;

public static final String REPEATED_URI_SEPARATOR = URI_SEPARATOR + URI_SEPARATOR;

public static final Pattern REPEATED_URI_SEPARATOR_PATTERN = Pattern.compile("//+");

/**
* Cleans the specified path. All files and subdirectories in the path will be deleted. (ie you'll be left with an
Expand Down Expand Up @@ -258,7 +264,7 @@ public boolean accept(File pathname) {

/**
* Take the file source path or URI string and convert it to a URI object. Any unnecessary path separators will be
* removed.
* removed. The URI object will always be {@link URI#isAbsolute() absolute}, i.e., will always have a scheme.
*
* @param source The file source path or URI
* @param isDirectory Whether the source is a directory
Expand All @@ -273,8 +279,8 @@ public static URI convertToURI(final String source, final boolean isDirectory) {
uri = new URI(source);
// Replace two or more consecutive slashes in the path with a single slash
final String path = uri.getPath();
if (path.contains("//")) {
final String canonicalizedPath = DUPLICATE_SLASH_PATTERN.matcher(path).replaceAll("/");
if (path.contains(REPEATED_URI_SEPARATOR)) {
final String canonicalizedPath = REPEATED_URI_SEPARATOR_PATTERN.matcher(path).replaceAll(URI_SEPARATOR);
uri = new URI(uri.getScheme(), uri.getUserInfo(), uri.getHost(), uri.getPort(), canonicalizedPath,
uri.getQuery(), uri.getFragment());
}
Expand All @@ -300,17 +306,17 @@ public static URI convertToURI(final String source, final boolean isDirectory) {
*/
public static URI convertToURI(final File file, final boolean isDirectory) {
String absPath = file.getAbsolutePath();
if (File.separatorChar != '/') {
absPath = absPath.replace(File.separatorChar, '/');
if (File.separatorChar != URI_SEPARATOR_CHAR) {
absPath = absPath.replace(File.separatorChar, URI_SEPARATOR_CHAR);
}
if (absPath.charAt(0) != '/') {
absPath = "/" + absPath;
if (absPath.charAt(0) != URI_SEPARATOR_CHAR) {
absPath = URI_SEPARATOR_CHAR + absPath;
}
if (isDirectory && absPath.charAt(absPath.length() - 1) != '/') {
absPath = absPath + "/";
if (isDirectory && absPath.charAt(absPath.length() - 1) != URI_SEPARATOR_CHAR) {
absPath = absPath + URI_SEPARATOR_CHAR;
}
if (absPath.startsWith("//")) {
absPath = "//" + absPath;
if (absPath.startsWith(REPEATED_URI_SEPARATOR)) {
absPath = REPEATED_URI_SEPARATOR + absPath;
}
try {
return new URI("file", null, absPath, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@
import java.util.function.Supplier;
import java.util.stream.Stream;

import static io.deephaven.base.FileUtils.URI_SEPARATOR;

/**
* Extracts a key-value partitioned table layout from a stream of URIs.
*/
public abstract class URIStreamKeyValuePartitionLayout<TLK extends TableLocationKey>
extends KeyValuePartitionLayout<TLK, URI> {

private static final String URI_SEPARATOR = "/";

protected final URI tableRootDirectory;
private final Supplier<LocationTableBuilder> locationTableBuilderFactory;
private final int maxPartitioningLevels;
Expand Down Expand Up @@ -96,7 +96,7 @@ private void getPartitions(@NotNull final URI relativePath,
@NotNull final TIntObjectMap<ColumnNameInfo> partitionColInfo,
final boolean registered) {
final String relativePathString = relativePath.getPath();
// The following assumes that there is exactly one URI_SEPARATOR between each subdirectory in the path
// The following assumes that there is exactly one separator between each subdirectory in the path
final String[] subDirs = relativePathString.split(URI_SEPARATOR);
final int numPartitioningCol = subDirs.length - 1;
if (registered) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package io.deephaven.engine.table.impl.sources.regioned;

import io.deephaven.base.FileUtils;
import io.deephaven.datastructures.util.CollectionUtil;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.table.*;
import io.deephaven.stringset.ArrayStringSet;
Expand Down Expand Up @@ -226,36 +225,34 @@ public void setUp() throws Exception {
final String tableName = "TestTable";

final PartitionedTable partitionedInputData = inputData.partitionBy("PC");
final File[] partitionedInputDestinations;
final String[] partitionedInputDestinations;
try (final Stream<String> partitionNames = partitionedInputData.table()
.<String>objectColumnIterator("PC").stream()) {
partitionedInputDestinations = partitionNames.map(pcv -> new File(dataDirectory,
"IP" + File.separator + "P" + pcv + File.separator + tableName + File.separator
+ PARQUET_FILE_NAME))
.toArray(File[]::new);
+ PARQUET_FILE_NAME)
.getPath())
.toArray(String[]::new);
}
ParquetTools.writeParquetTables(
ParquetTools.writeTables(
partitionedInputData.constituents(),
partitionedDataDefinition.getWritable(),
parquetInstructions,
partitionedInputDestinations,
CollectionUtil.ZERO_LENGTH_STRING_ARRAY_ARRAY);
parquetInstructions.withTableDefinition(partitionedDataDefinition.getWritable()));

final PartitionedTable partitionedInputMissingData = inputMissingData.view("PC", "II").partitionBy("PC");
final File[] partitionedInputMissingDestinations;
final String[] partitionedInputMissingDestinations;
try (final Stream<String> partitionNames = partitionedInputMissingData.table()
.<String>objectColumnIterator("PC").stream()) {
partitionedInputMissingDestinations = partitionNames.map(pcv -> new File(dataDirectory,
"IP" + File.separator + "P" + pcv + File.separator + tableName + File.separator
+ PARQUET_FILE_NAME))
.toArray(File[]::new);
+ PARQUET_FILE_NAME)
.getPath())
.toArray(String[]::new);
}
ParquetTools.writeParquetTables(
ParquetTools.writeTables(
partitionedInputMissingData.constituents(),
partitionedMissingDataDefinition.getWritable(),
parquetInstructions,
partitionedInputMissingDestinations,
CollectionUtil.ZERO_LENGTH_STRING_ARRAY_ARRAY);
parquetInstructions.withTableDefinition(partitionedMissingDataDefinition.getWritable()));

expected = TableTools
.merge(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,17 @@
//
package io.deephaven.parquet.base;

import io.deephaven.UncheckedDeephavenException;
import io.deephaven.util.channel.CachedChannelProvider;
import io.deephaven.util.channel.SeekableChannelContext;
import io.deephaven.util.channel.SeekableChannelsProvider;
import io.deephaven.util.channel.SeekableChannelsProviderLoader;
import org.apache.parquet.format.*;
import org.apache.parquet.format.ColumnOrder;
import org.apache.parquet.format.Type;
import org.apache.parquet.schema.*;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -37,6 +42,76 @@ public class ParquetFileReader {
private final URI rootURI;
private final MessageType type;

/**
rcaudy marked this conversation as resolved.
Show resolved Hide resolved
* Make a {@link ParquetFileReader} for the supplied {@link File}. Wraps {@link IOException} as
* {@link UncheckedDeephavenException}.
*
* @param parquetFile The parquet file or the parquet metadata file
* @param specialInstructions Optional read instructions to pass to {@link SeekableChannelsProvider} while creating
* channels
* @return The new {@link ParquetFileReader}
*/
public static ParquetFileReader create(
@NotNull final File parquetFile,
@Nullable final Object specialInstructions) {
try {
return createChecked(parquetFile, specialInstructions);
} catch (IOException e) {
throw new UncheckedDeephavenException("Failed to create Parquet file reader: " + parquetFile, e);
}
}

/**
* Make a {@link ParquetFileReader} for the supplied {@link URI}. Wraps {@link IOException} as
* {@link UncheckedDeephavenException}.
*
* @param parquetFileURI The URI for the parquet file or the parquet metadata file
* @param specialInstructions Optional read instructions to pass to {@link SeekableChannelsProvider} while creating
* channels
* @return The new {@link ParquetFileReader}
*/
public static ParquetFileReader create(
@NotNull final URI parquetFileURI,
@Nullable final Object specialInstructions) {
try {
return createChecked(parquetFileURI, specialInstructions);
} catch (IOException e) {
throw new UncheckedDeephavenException("Failed to create Parquet file reader: " + parquetFileURI, e);
}
}

/**
* Make a {@link ParquetFileReader} for the supplied {@link File}.
*
* @param parquetFile The parquet file or the parquet metadata file
* @param specialInstructions Optional read instructions to pass to {@link SeekableChannelsProvider} while creating
* channels
* @return The new {@link ParquetFileReader}
* @throws IOException if an IO exception occurs
*/
public static ParquetFileReader createChecked(
@NotNull final File parquetFile,
@Nullable final Object specialInstructions) throws IOException {
return createChecked(convertToURI(parquetFile, false), specialInstructions);
}

/**
* Make a {@link ParquetFileReader} for the supplied {@link URI}.
*
* @param parquetFileURI The URI for the parquet file or the parquet metadata file
* @param specialInstructions Optional read instructions to pass to {@link SeekableChannelsProvider} while creating
* channels
* @return The new {@link ParquetFileReader}
* @throws IOException if an IO exception occurs
*/
public static ParquetFileReader createChecked(
@NotNull final URI parquetFileURI,
@Nullable final Object specialInstructions) throws IOException {
final SeekableChannelsProvider provider = SeekableChannelsProviderLoader.getInstance().fromServiceLoader(
parquetFileURI, specialInstructions);
return new ParquetFileReader(parquetFileURI, new CachedChannelProvider(provider, 1 << 7));
}

/**
* Create a new ParquetFileReader for the provided source.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,17 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;

import static io.deephaven.base.FileUtils.URI_SEPARATOR_CHAR;

public final class ParquetUtils {

public static final String METADATA_FILE_NAME = "_metadata";
public static final String COMMON_METADATA_FILE_NAME = "_common_metadata";
public static final String PARQUET_FILE_EXTENSION = ".parquet";
public static final String METADATA_FILE_URI_SUFFIX = URI_SEPARATOR_CHAR + METADATA_FILE_NAME;
public static final String COMMON_METADATA_FILE_URI_SUFFIX = URI_SEPARATOR_CHAR + COMMON_METADATA_FILE_NAME;
public static final String METADATA_FILE_SUFFIX = File.separatorChar + METADATA_FILE_NAME;
public static final String COMMON_METADATA_FILE_SUFFIX = File.separatorChar + COMMON_METADATA_FILE_NAME;
private static final String MAGIC_STR = "PAR1";
public static final byte[] MAGIC = MAGIC_STR.getBytes(StandardCharsets.US_ASCII);

Expand All @@ -35,6 +41,20 @@ public static String getPerFileMetadataKey(final String filePath) {
return "deephaven_per_file_" + filePath.replace(File.separatorChar, '_');
}

/**
* This method verifies if the source points to a parquet file or a metadata file. Provided source can be a local
* file path or a URI. Also, it can point to a parquet file, metadata file or a directory.
*/
public static boolean isParquetFile(@NotNull final String source) {
boolean ret = source.endsWith(PARQUET_FILE_EXTENSION)
|| source.endsWith(METADATA_FILE_URI_SUFFIX)
|| source.endsWith(COMMON_METADATA_FILE_URI_SUFFIX);
if (File.separatorChar != URI_SEPARATOR_CHAR) {
ret = ret || source.endsWith(METADATA_FILE_SUFFIX) || source.endsWith(COMMON_METADATA_FILE_SUFFIX);
}
return ret;
}

/**
* Check if the provided path points to a non-hidden parquet file, and that none of its parents (till rootDir) are
* hidden.
Expand Down
Loading
Loading