Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35768][FLIP-444] Native file copy support #25028

Merged
merged 8 commits into from
Jul 12, 2024

Conversation

pnowojski
Copy link
Contributor

What is the purpose of the change

State downloading in Flink can be a time and CPU consuming operation, which is especially visible if CPU resources per task slot are strictly restricted to for example a single CPU. Downloading 1GB of state size can take significant amount of time, while the code doing so is quite inefficient.

Currently when downloading state files, Flink is creating an FSDataInputStream from the remote file, and copies its bytes, to an OutputStream pointing to a local file (in the RocksDBStateDownloader#downloadDataForStateHandle method). FSDataInputStream internally is being wrapped by many layers of abstractions and indirections and what’s worse, every file is being copied individually, which leads to quite high overheads for small files. Download times and download process CPU efficiency can be significantly improved if we introduced an API to allow org.apache.flink.core.fs.FileSystem to copy many files natively and all at once.

For S3, there are at least two potential implementations. The first one is using AWS SDKv2 directly (Flink currently is using AWS SDKv1 wrapped by hadoop/presto) and Amazon S3 Transfer Manager. Second option is to use a 3rd party tool called s5cmd. It is claimed to be a faster alternative to the official AWS clients, which was confirmed by our benchmarks.

Brief change log

This PR covers two first steps for FLIP-444:

Verifying this change

This change is covered by a couple of new ITCases, that run s5cmd against Minio and some unit tests.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@flinkbot
Copy link
Collaborator

flinkbot commented Jul 5, 2024

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link
Contributor

@rkhachatryan rkhachatryan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, LGTM in general.
I've left some comments, PTAL.

(disclaimer: I've reviewed the internal version of this PR already)

Comment on lines +45 to +46
public static final ConfigOption<String> ACCESS_KEY =
ConfigOptions.key("s3.access-key")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't find generated docs for these options (all s3 options).
Should we start generating in this hotfix commit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we start generating in this hotfix commit?

Probably a good idea, but I guess that would require some larger re-work of the docs. I presume currently the docs, including the list of configuration parameters, are manually crafted, without any part that is automatically generated.

My motivation was to have those ConfigOptions in one place here, to avoid copy/pasting the keys in the code various of places, including tests.

Comment on lines 116 to +117
public void setS3ConfigOptions(Configuration config) {
config.setString(FLINK_CONFIG_S3_ENDPOINT, getHttpEndpoint());
config.set(AbstractS3FileSystemFactory.ENDPOINT, getHttpEndpoint());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: re-order commits so that the use of options goes after their introduction?

Currently, I see it as

[hotfix] Use newly defined ConfigOptions in MinioTestContainer 
[hotfix] Move CompressionUtils to flink-core
[hotfix] Create ConfigOptions for s3 access/secret keys and endpoint 

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh? For me this ("[hotfix] Use newly defined ConfigOptions in MinioTestContainer") is the 3rd commit

Comment on lines +45 to +47
default Optional<org.apache.flink.core.fs.Path> maybeGetPath() {
return Optional.empty();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this also be implemented by DirectoryStreamStateHandle and SegmentFileStateHandle?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like DirectoryStreamStateHandle can not be used for recovery any way due to:

    public FSDataInputStream openInputStream() {
        throw new UnsupportedOperationException();
    }

Also in that case I think we would have to add a special handling of directories (The s5cmd docs do not state that clearly, but it looks like for download, and only for download, you need to add a wildcard *?)

Direct copy files for SegmentFileStateHandle might be less efficient, if roughly speaking, the desired segment is < 50% of the underlying file. So I'm not sure if I would use it. Maybe in the future we should add some code letting the filesystem to decide if it's worth copying given state handle or not? 🤔 Moving more logic of deciding how to download handles from the RocksDBDownloader to FileSystem?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about other potential usages of maybeGetPath (not s5cmd), e.g. in tests. I'd assume that these handles would return some path, and returning empty would break this assumption.
And this wouldn't necessarily mean that s5 will be used for these handles - it also depends on the caller (rocksdb downloader).
Anyways, this would be a NIT, so please feel free to leave it as is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I see.

And this wouldn't necessarily mean that s5 will be used for these handles - it also depends on the caller (rocksdb downloader).

I somehow have a feeling that in the future we might want to rethink how to decide which downloading/uploading code path to use.

Copy link
Contributor Author

@pnowojski pnowojski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the PR, can you take a look again @rkhachatryan ?

Comment on lines +45 to +46
public static final ConfigOption<String> ACCESS_KEY =
ConfigOptions.key("s3.access-key")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we start generating in this hotfix commit?

Probably a good idea, but I guess that would require some larger re-work of the docs. I presume currently the docs, including the list of configuration parameters, are manually crafted, without any part that is automatically generated.

My motivation was to have those ConfigOptions in one place here, to avoid copy/pasting the keys in the code various of places, including tests.

Comment on lines 116 to +117
public void setS3ConfigOptions(Configuration config) {
config.setString(FLINK_CONFIG_S3_ENDPOINT, getHttpEndpoint());
config.set(AbstractS3FileSystemFactory.ENDPOINT, getHttpEndpoint());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh? For me this ("[hotfix] Use newly defined ConfigOptions in MinioTestContainer") is the 3rd commit

Comment on lines +45 to +47
default Optional<org.apache.flink.core.fs.Path> maybeGetPath() {
return Optional.empty();
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like DirectoryStreamStateHandle can not be used for recovery any way due to:

    public FSDataInputStream openInputStream() {
        throw new UnsupportedOperationException();
    }

Also in that case I think we would have to add a special handling of directories (The s5cmd docs do not state that clearly, but it looks like for download, and only for download, you need to add a wildcard *?)

Direct copy files for SegmentFileStateHandle might be less efficient, if roughly speaking, the desired segment is < 50% of the underlying file. So I'm not sure if I would use it. Maybe in the future we should add some code letting the filesystem to decide if it's worth copying given state handle or not? 🤔 Moving more logic of deciding how to download handles from the RocksDBDownloader to FileSystem?

Copy link
Contributor

@rkhachatryan rkhachatryan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@pnowojski pnowojski changed the title [FLINK-35739][FLIP-444] Native file copy support [FLINK-35768][FLIP-444] Native file copy support Jul 10, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants