Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement](hive-writer) Adjust table sink exchange rebalancer params. #33397

Merged

Conversation

kaka11chen
Copy link
Contributor

@kaka11chen kaka11chen commented Apr 8, 2024

Proposed changes

Issue Number: #31442

Change table sink exchange rebalancer params to node level and adjust these params to improve write performance by better balance.

rebalancer params:

DEFINE_mInt64(table_sink_partition_write_min_data_processed_rebalance_threshold,
              "26214400"); // 25MB
// Minimum partition data processed to rebalance writers in exchange when partition writing
DEFINE_mInt64(table_sink_partition_write_min_partition_data_processed_rebalance_threshold,
              "15728640"); // 15MB

Further comments

If this is a relatively large or complex change, kick off the discussion at [email protected] by explaining why you chose the solution you did and what alternatives you considered, etc...

@doris-robot
Copy link

Thank you for your contribution to Apache Doris.
Don't know what should be done next? See How to process your PR

Since 2024-03-18, the Document has been moved to doris-website.
See Doris Document.

@kaka11chen
Copy link
Contributor Author

run buildall

Copy link
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clang-tidy made some suggestions

@@ -45,13 +45,40 @@ SkewedPartitionRebalancer::SkewedPartitionRebalancer(
_partition_data_size_since_last_rebalance_per_task(partition_count, 0),
_estimated_task_bucket_data_size_since_last_rebalance(task_count * task_bucket_count, 0),
_partition_assignments(partition_count) {
if (task_addresses != nullptr) {
CHECK(task_addresses->size() == task_count);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: use of undeclared identifier 'CHECK' [clang-diagnostic-error]

        CHECK(task_addresses->size() == task_count);
        ^

Comment on lines 51 to 52
for (int i = 0; i < _task_addresses.size(); ++i) {
auto it = _assigned_address_to_task_buckets_num.find(_task_addresses[i]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: use range-based for loop instead [modernize-loop-convert]

Suggested change
for (int i = 0; i < _task_addresses.size(); ++i) {
auto it = _assigned_address_to_task_buckets_num.find(_task_addresses[i]);
for (auto & _task_addresse : _task_addresses) {
auto it = _assigned_address_to_task_buckets_num.find(_task_addresse);

be/src/vec/exec/skewed_partition_rebalancer.cpp:53:

-                 _assigned_address_to_task_buckets_num.insert({_task_addresses[i], 0});
+                 _assigned_address_to_task_buckets_num.insert({_task_addresse, 0});

_partition_assignments[partition].emplace_back(std::move(task_bucket));

for (int i = 0; i < _partition_assignments[partition].size(); ++i) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: use range-based for loop instead [modernize-loop-convert]

Suggested change
for (int i = 0; i < _partition_assignments[partition].size(); ++i) {
for (auto & i : _partition_assignments[partition]) {

be/src/vec/exec/skewed_partition_rebalancer.cpp:72:

-                     _partition_assignments[partition][i].task_address);
+                     i.task_address);

be/src/vec/exec/skewed_partition_rebalancer.cpp:74:

-                 _assigned_address_to_task_buckets_num[_partition_assignments[partition][i]
+                 _assigned_address_to_task_buckets_num[i

_assigned_address_to_task_buckets_num[_partition_assignments[partition][i]
.task_address]++;
} else {
LOG(FATAL) << "__builtin_unreachable";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: use of undeclared identifier 'FATAL' [clang-diagnostic-error]

                LOG(FATAL) << "__builtin_unreachable";
                    ^


TaskBucket(int task_id_, int bucket_id_, int task_bucket_count_)
: task_id(task_id_), id(task_id_ * task_bucket_count_ + bucket_id_) {}
TaskBucket(int task_id_, int bucket_id_, int task_bucket_count_, std::string task_address_)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: pass by value and use std::move [modernize-pass-by-value]

be/src/vec/exec/skewed_partition_rebalancer.h:51:

- #include <vector>
+ #include <utility>
+ #include <vector>

be/src/vec/exec/skewed_partition_rebalancer.h:67:

-                   task_address(task_address_) {}
+                   task_address(std::move(task_address_)) {}

@kaka11chen
Copy link
Contributor Author

run buildall

Copy link
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clang-tidy made some suggestions

@@ -20,6 +20,8 @@

#include "vec/exec/skewed_partition_rebalancer.h"

#include <glog/logging.h>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: 'glog/logging.h' file not found [clang-diagnostic-error]

#include <glog/logging.h>
         ^

Comment on lines 53 to 54
for (int i = 0; i < _task_addresses.size(); ++i) {
auto it = _assigned_address_to_task_buckets_num.find(_task_addresses[i]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: use range-based for loop instead [modernize-loop-convert]

Suggested change
for (int i = 0; i < _task_addresses.size(); ++i) {
auto it = _assigned_address_to_task_buckets_num.find(_task_addresses[i]);
for (auto & _task_addresse : _task_addresses) {
auto it = _assigned_address_to_task_buckets_num.find(_task_addresse);

be/src/vec/exec/skewed_partition_rebalancer.cpp:55:

-                 _assigned_address_to_task_buckets_num.insert({_task_addresses[i], 0});
+                 _assigned_address_to_task_buckets_num.insert({_task_addresse, 0});

@kaka11chen kaka11chen force-pushed the adjust_table_sink_exchange_rebalancer branch 2 times, most recently from 73d408c to 9c5baba Compare April 9, 2024 02:09
@kaka11chen
Copy link
Contributor Author

run buildall

@kaka11chen kaka11chen force-pushed the adjust_table_sink_exchange_rebalancer branch from 9c5baba to 5ed8c4f Compare April 9, 2024 02:24
@kaka11chen
Copy link
Contributor Author

run buildall

Copy link
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clang-tidy made some suggestions

TaskBucket(int task_id_, int bucket_id_, int task_bucket_count_)
: task_id(task_id_), id(task_id_ * task_bucket_count_ + bucket_id_) {}
TaskBucket(int task_id_, int bucket_id_, int task_bucket_count_,
const std::string& task_address_)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: pass by value and use std::move [modernize-pass-by-value]

Suggested change
const std::string& task_address_)
std::string task_address_)

be/src/vec/exec/skewed_partition_rebalancer.h:69:

-                   task_address(task_address_) {}
+                   task_address(std::move(task_address_)) {}

@doris-robot
Copy link

TeamCity be ut coverage result:
Function Coverage: 35.58% (8888/24980)
Line Coverage: 27.34% (73018/267082)
Region Coverage: 26.51% (37741/142389)
Branch Coverage: 23.32% (19235/82494)
Coverage Report: http://coverage.selectdb-in.cc/coverage/5ed8c4f265af9e225ef7b0d4a62b6702e596dfa7_5ed8c4f265af9e225ef7b0d4a62b6702e596dfa7/report/index.html

@kaka11chen kaka11chen force-pushed the adjust_table_sink_exchange_rebalancer branch from b0b22ad to 90ad364 Compare April 9, 2024 08:05
Copy link
Contributor

github-actions bot commented Apr 9, 2024

clang-tidy review says "All clean, LGTM! 👍"

@kaka11chen
Copy link
Contributor Author

run buildall

@kaka11chen kaka11chen marked this pull request as ready for review April 9, 2024 08:06
Copy link
Contributor

github-actions bot commented Apr 9, 2024

clang-tidy review says "All clean, LGTM! 👍"

@doris-robot
Copy link

TeamCity be ut coverage result:
Function Coverage: 35.63% (8905/24990)
Line Coverage: 27.37% (73140/267255)
Region Coverage: 26.53% (37798/142446)
Branch Coverage: 23.34% (19260/82528)
Coverage Report: http://coverage.selectdb-in.cc/coverage/90ad364b3cb4ec9e708ad54dac01e6afdf6c3ac3_90ad364b3cb4ec9e708ad54dac01e6afdf6c3ac3/report/index.html

Copy link
Contributor

@morningman morningman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@github-actions github-actions bot added the approved Indicates a PR has been approved by one committer. label Apr 12, 2024
Copy link
Contributor

PR approved by at least one committer and no changes requested.

Copy link
Contributor

PR approved by anyone and no changes requested.

@morningman morningman merged commit e824895 into apache:master Apr 12, 2024
26 of 30 checks passed
morningman pushed a commit that referenced this pull request Apr 12, 2024
…ms. (#33397)

Issue Number:  #31442

Change table sink exchange rebalancer params to node level and adjust these params to improve write performance by better balance.

rebalancer params:
```
DEFINE_mInt64(table_sink_partition_write_min_data_processed_rebalance_threshold,
              "26214400"); // 25MB
// Minimum partition data processed to rebalance writers in exchange when partition writing
DEFINE_mInt64(table_sink_partition_write_min_partition_data_processed_rebalance_threshold,
              "15728640"); // 15MB
```
seawinde pushed a commit to seawinde/doris that referenced this pull request Apr 15, 2024
…ms. (apache#33397)

Issue Number:  apache#31442

Change table sink exchange rebalancer params to node level and adjust these params to improve write performance by better balance.

rebalancer params:
```
DEFINE_mInt64(table_sink_partition_write_min_data_processed_rebalance_threshold,
              "26214400"); // 25MB
// Minimum partition data processed to rebalance writers in exchange when partition writing
DEFINE_mInt64(table_sink_partition_write_min_partition_data_processed_rebalance_threshold,
              "15728640"); // 15MB
```
seawinde pushed a commit to seawinde/doris that referenced this pull request Apr 17, 2024
…ms. (apache#33397)

Issue Number:  apache#31442

Change table sink exchange rebalancer params to node level and adjust these params to improve write performance by better balance.

rebalancer params:
```
DEFINE_mInt64(table_sink_partition_write_min_data_processed_rebalance_threshold,
              "26214400"); // 25MB
// Minimum partition data processed to rebalance writers in exchange when partition writing
DEFINE_mInt64(table_sink_partition_write_min_partition_data_processed_rebalance_threshold,
              "15728640"); // 15MB
```
morningman pushed a commit to morningman/doris that referenced this pull request Apr 30, 2024
…ms. (apache#33397)

Issue Number:  apache#31442

Change table sink exchange rebalancer params to node level and adjust these params to improve write performance by better balance.

rebalancer params:
```
DEFINE_mInt64(table_sink_partition_write_min_data_processed_rebalance_threshold,
              "26214400"); // 25MB
// Minimum partition data processed to rebalance writers in exchange when partition writing
DEFINE_mInt64(table_sink_partition_write_min_partition_data_processed_rebalance_threshold,
              "15728640"); // 15MB
```
dataroaring pushed a commit that referenced this pull request May 1, 2024
…4.0 (#34371)

* [feature](insert)use optional location and add hive regression test (#33153)

* [feature](iceberg)The new DDL syntax is added to create iceberg partitioned tables (#33338)

support partition by :

```
create table tb1 (c1 string, ts datetime) engine = iceberg partition by (c1, day(ts)) () properties ("a"="b")
```

* [Enhancement](hive-writer) Adjust table sink exchange rebalancer params. (#33397)

Issue Number:  #31442

Change table sink exchange rebalancer params to node level and adjust these params to improve write performance by better balance.

rebalancer params:
```
DEFINE_mInt64(table_sink_partition_write_min_data_processed_rebalance_threshold,
              "26214400"); // 25MB
// Minimum partition data processed to rebalance writers in exchange when partition writing
DEFINE_mInt64(table_sink_partition_write_min_partition_data_processed_rebalance_threshold,
              "15728640"); // 15MB
```

* [feature](profile) add transaction statistics for profile (#33488)

1. commit total time
2. fs operator total time
     rename file count
     rename dir count
     delete dir count
3. add partition total time
    add partition count
4. update partition total time
    update partition count
like:
```
      -  Transaction  Commit  Time:  906ms
          -  FileSystem  Operator  Time:  833ms
              -  Rename  File  Count:  4
              -  Rename  Dir  Count:  0
              -  Delete  Dir  Count:  0
          -  HMS  Add  Partition  Time:  0ms
              -  HMS  Add  Partition  Count:  0
          -  HMS  Update  Partition  Time:  68ms
              -  HMS  Update  Partition  Count:  4
```

* [feature](iceberg) add iceberg transaction implement (#33629)

Issue #31442

add iceberg transaction

* [feature](insert)support default value when create hive table (#33666)

Issue Number: #31442

hive3 support create table with column's default value
if use hive3, we can write default value to table

* [refactor](filesystem)refactor `filesystem` interface (#33361)

1. Remame`list` to `globList` . The path of this `list` needs to have a wildcard character, and the corresponding hdfs interface is `globStatus`, so the modified name is `globList`.
2. If you only need to view files based on paths, you can use the `listFiles` operation.
3. Merge `listLocatedFiles` function into `listFiles` function.

* [opt](meta-cache) refine the meta cache (#33449)

1. Use `caffeine` instead of `guava cache` to get better performace
2. Add a new class `CacheFactory`

    All (Async)LoadingCache should be built from `CacheFactory`

3. Use separator executor for different caches

    1. rowCountRefreshExecutor
      For row count cache.
      Row count cache is an async loading cache, and we can ignore the result
      if cache missing or thread pool is full.
      So use a separate executor for this cache.

    2.  commonRefreshExecutor
      For other caches. Other caches are sync loading cache.
      But commonRefreshExecutor will be used for async refresh.
      That is, if cache entry is missing, the cache value will be loaded in caller thread, sychronously.
      if cache entry need refresh, it will be reloaded in commonRefreshExecutor.

    3. fileListingExecutor
      File listing is a heavy operation, so use a separate executor for it.
      For fileCache, the refresh operation will still use commonRefreshExecutor to trigger refresh.
      And fileListingExecutor will be used to list file.

4. Change the refresh and expire logic of caches

    For most of caches, set `refreshAfterWrite` strategy, so that
    even if the cache entry is expired, the old entry can still be
    used while new entry is being loaded.

5. Add new global variable `enable_get_row_count_from_file_list`

    Default is true, if false, will disable getting row count from file list

* [bugfix](hive)delete write path after hive insert (#33798)

Issue #31442

1. delete file according query id
2. delete write path after insert

* [Enhancement](multi-catalog) Rewrite `S3URI` to remove tricky virtual bucket mechanism and support different uri styles by flags. (#33858)

Many domestic cloud vendors are compatible with the s3 protocol. However, early versions of s3 client will only generate path style http requests (aws/aws-sdk-java-v2#763) when encountering endpoints that do not start with s3, while some cloud vendors only support virtual host style http request.

Therefore, Doris used `forceVirtualHosted` in `S3URI` to convert it into a virtual hosted path and implemented it through path style.
For example:
For s3 uri `s3://my-bucket/data/file.txt`, It will eventually be parsed into:
- virtualBucket: my-bucket
- Bucket: data (bucket must be set, otherwise the s3 client will report an error) Especially this step is particularly tricky because of the limitations of the s3 client.
- Key: file.txt

 The path style mode is used to generate an http request similar to the virtual host by setting the endpoint to virtualBucket + original endpoint, setting the bucket and key.
**However, the bucket and key here are inconsistent with the original concepts of s3, but the aws client happens to be able to generate an http request similar to the virtual host through the path style mode.**

However, after #30799 we have upgrade the aws sdk version from 2.17.257 to 2.20.131. The current aws s3 client can already generate a virtual host by third party by default style of http request. So in #31111 need to set the path style option, let the s3 client use doris' virtual bucket mechanism to continue working.

**Finally, the virtual bucket mechanism is too confusing and tricky, and we no longer need it with the new version of s3 client.**

### Resolution:

Rewrite `S3URI` to remove tricky virtual bucket mechanism and support different uri styles by flags.

This class represents a fully qualified location in S3 for input/output operations expressed as as URI.
 #### For AWS S3, URI common styles:
  - AWS Client Style(Hadoop S3 Style): `s3://my-bucket/path/to/file?versionId=abc123&partNumber=77&partNumber=88`
  - Virtual Host Style: `https://my-bucket.s3.us-west-1.amazonaws.com/resources/doc.txt?versionId=abc123&partNumber=77&partNumber=88`
  - Path Style: `https://s3.us-west-1.amazonaws.com/my-bucket/resources/doc.txt?versionId=abc123&partNumber=77&partNumber=88`
 
  Regarding the above-mentioned common styles, we can use <code>isPathStyle</code> to control whether to use path style
  or virtual host style.
  "Virtual host style" is the currently mainstream and recommended approach to use, so the default value of
  <code>isPathStyle</code> is false.
 
  #### Other Styles:
  - Virtual Host AWS Client (Hadoop S3) Mixed Style:
    `s3://my-bucket.s3.us-west-1.amazonaws.com/resources/doc.txt?versionId=abc123&partNumber=77&partNumber=88`
  - Path AWS Client (Hadoop S3) Mixed Style:
     `s3://s3.us-west-1.amazonaws.com/my-bucket/resources/doc.txt?versionId=abc123&partNumber=77&partNumber=88`
 
  For these two styles, we can use <code>isPathStyle</code> and <code>forceParsingByStandardUri</code>
  to control whether to use.
  Virtual Host AWS Client (Hadoop S3) Mixed Style: <code>isPathStyle = false && forceParsingByStandardUri = true</code>
  Path AWS Client (Hadoop S3) Mixed Style: <code>isPathStyle = true && forceParsingByStandardUri = true</code>
 
  When the incoming location is url encoded, the encoded string will be returned.
  For <code>getKey()</code>, <code>getQueryParams()</code> will return the encoding string

* [improvement](hive)add the `queryid` to the temporary file path (#34278)

`_temp_<table_name>` to `_temp_<queryid>_<table_name>`.
Prevent users from having a table with the name `_temp_<table_name>`.

So as to partition temp dir

* [feature](Cloud) Load index data into index cache when writing data (#34046)

* [Feature](hive-writer) Implements s3 file committer. (#33937)

Issue Number: #31442

[Feature] (hive-writer) Implements s3 file committer. 

S3 committer will start multipart uploading all files on BE side, and then complete multipart upload these files on FE side. If you do not complete multi parts of a file, the file will not be visible. So in this way, the atomicity of a single file can be guaranteed. But it still cannot guarantee the atomicity of multiple files. Because hive committers have best-effort semantics, this shortens the inconsistent time window.

## ChangeList:
- Add `used_by_s3_committer` in `FileWriterOptions` on BE side to start multi-part uploading files, then complete multi-part uploading files on FE side.
- `cosn://`use s3 client on FE side, because it need to complete multi-part uploading files on FE side.
-  Add `Status directoryExists(String dir)` and `Status deleteDirectory` in `FileSystem`.

---------

Co-authored-by: slothever <[email protected]>
Co-authored-by: wuwenchi <[email protected]>
Co-authored-by: Qi Chen <[email protected]>
Co-authored-by: AlexYue <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
approved Indicates a PR has been approved by one committer. dev/2.1.3-merged reviewed
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants