-
Notifications
You must be signed in to change notification settings - Fork 13.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-35768][FLIP-444] Native file copy support #25028
Conversation
b58fdcb
to
62536a3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, LGTM in general.
I've left some comments, PTAL.
(disclaimer: I've reviewed the internal version of this PR already)
public static final ConfigOption<String> ACCESS_KEY = | ||
ConfigOptions.key("s3.access-key") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I couldn't find generated docs for these options (all s3 options).
Should we start generating in this hotfix commit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we start generating in this hotfix commit?
Probably a good idea, but I guess that would require some larger re-work of the docs. I presume currently the docs, including the list of configuration parameters, are manually crafted, without any part that is automatically generated.
My motivation was to have those ConfigOptions
in one place here, to avoid copy/pasting the keys in the code various of places, including tests.
...esystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java
Show resolved
Hide resolved
...esystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java
Outdated
Show resolved
Hide resolved
public void setS3ConfigOptions(Configuration config) { | ||
config.setString(FLINK_CONFIG_S3_ENDPOINT, getHttpEndpoint()); | ||
config.set(AbstractS3FileSystemFactory.ENDPOINT, getHttpEndpoint()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: re-order commits so that the use of options goes after their introduction?
Currently, I see it as
[hotfix] Use newly defined ConfigOptions in MinioTestContainer
[hotfix] Move CompressionUtils to flink-core
[hotfix] Create ConfigOptions for s3 access/secret keys and endpoint
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Huh? For me this ("[hotfix] Use newly defined ConfigOptions in MinioTestContainer") is the 3rd commit
default Optional<org.apache.flink.core.fs.Path> maybeGetPath() { | ||
return Optional.empty(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this also be implemented by DirectoryStreamStateHandle
and SegmentFileStateHandle
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like DirectoryStreamStateHandle
can not be used for recovery any way due to:
public FSDataInputStream openInputStream() {
throw new UnsupportedOperationException();
}
Also in that case I think we would have to add a special handling of directories (The s5cmd
docs do not state that clearly, but it looks like for download, and only for download, you need to add a wildcard *
?)
Direct copy files for SegmentFileStateHandle
might be less efficient, if roughly speaking, the desired segment is < 50% of the underlying file. So I'm not sure if I would use it. Maybe in the future we should add some code letting the filesystem to decide if it's worth copying given state handle or not? 🤔 Moving more logic of deciding how to download handles from the RocksDBDownloader
to FileSystem
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking about other potential usages of maybeGetPath
(not s5cmd), e.g. in tests. I'd assume that these handles would return some path, and returning empty
would break this assumption.
And this wouldn't necessarily mean that s5 will be used for these handles - it also depends on the caller (rocksdb downloader).
Anyways, this would be a NIT, so please feel free to leave it as is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I see.
And this wouldn't necessarily mean that s5 will be used for these handles - it also depends on the caller (rocksdb downloader).
I somehow have a feeling that in the future we might want to rethink how to decide which downloading/uploading code path to use.
...d-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've updated the PR, can you take a look again @rkhachatryan ?
public static final ConfigOption<String> ACCESS_KEY = | ||
ConfigOptions.key("s3.access-key") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we start generating in this hotfix commit?
Probably a good idea, but I guess that would require some larger re-work of the docs. I presume currently the docs, including the list of configuration parameters, are manually crafted, without any part that is automatically generated.
My motivation was to have those ConfigOptions
in one place here, to avoid copy/pasting the keys in the code various of places, including tests.
...esystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java
Show resolved
Hide resolved
public void setS3ConfigOptions(Configuration config) { | ||
config.setString(FLINK_CONFIG_S3_ENDPOINT, getHttpEndpoint()); | ||
config.set(AbstractS3FileSystemFactory.ENDPOINT, getHttpEndpoint()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Huh? For me this ("[hotfix] Use newly defined ConfigOptions in MinioTestContainer") is the 3rd commit
default Optional<org.apache.flink.core.fs.Path> maybeGetPath() { | ||
return Optional.empty(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like DirectoryStreamStateHandle
can not be used for recovery any way due to:
public FSDataInputStream openInputStream() {
throw new UnsupportedOperationException();
}
Also in that case I think we would have to add a special handling of directories (The s5cmd
docs do not state that clearly, but it looks like for download, and only for download, you need to add a wildcard *
?)
Direct copy files for SegmentFileStateHandle
might be less efficient, if roughly speaking, the desired segment is < 50% of the underlying file. So I'm not sure if I would use it. Maybe in the future we should add some code letting the filesystem to decide if it's worth copying given state handle or not? 🤔 Moving more logic of deciding how to download handles from the RocksDBDownloader
to FileSystem
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
What is the purpose of the change
State downloading in Flink can be a time and CPU consuming operation, which is especially visible if CPU resources per task slot are strictly restricted to for example a single CPU. Downloading 1GB of state size can take significant amount of time, while the code doing so is quite inefficient.
Currently when downloading state files, Flink is creating an FSDataInputStream from the remote file, and copies its bytes, to an OutputStream pointing to a local file (in the RocksDBStateDownloader#downloadDataForStateHandle method). FSDataInputStream internally is being wrapped by many layers of abstractions and indirections and what’s worse, every file is being copied individually, which leads to quite high overheads for small files. Download times and download process CPU efficiency can be significantly improved if we introduced an API to allow org.apache.flink.core.fs.FileSystem to copy many files natively and all at once.
For S3, there are at least two potential implementations. The first one is using AWS SDKv2 directly (Flink currently is using AWS SDKv1 wrapped by hadoop/presto) and Amazon S3 Transfer Manager. Second option is to use a 3rd party tool called s5cmd. It is claimed to be a faster alternative to the official AWS clients, which was confirmed by our benchmarks.
Brief change log
This PR covers two first steps for FLIP-444:
Verifying this change
This change is covered by a couple of new ITCases, that run s5cmd against Minio and some unit tests.
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation