Skip to content

Commit

Permalink
[fix](scan) Fix missing sync rowsets in cloud mode (#31717)
Browse files Browse the repository at this point in the history
  • Loading branch information
platoneko committed Mar 4, 2024
1 parent c90728c commit fb2c401
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 11 deletions.
13 changes: 6 additions & 7 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,7 @@ Status bthread_fork_join(const std::vector<std::function<Status()>>& tasks, int
return nullptr;
};

std::vector<bthread_t> bthread_ids;
bthread_ids.resize(tasks.size());
for (int task_idx = 0; task_idx < tasks.size(); ++task_idx) {
auto* task = &(tasks[task_idx]);
for (const auto& task : tasks) {
{
std::unique_lock lk(lock);
// Wait until there are available slots
Expand All @@ -93,8 +90,8 @@ Status bthread_fork_join(const std::vector<std::function<Status()>>& tasks, int
}

// dispatch task into bthreads
auto* fn = new std::function<void()>([&, task] {
auto st = (*task)();
auto* fn = new std::function<void()>([&, &task = task] {
auto st = task();
{
std::lock_guard lk(lock);
--count;
Expand All @@ -104,7 +101,9 @@ Status bthread_fork_join(const std::vector<std::function<Status()>>& tasks, int
cond.notify_one();
}
});
if (bthread_start_background(&bthread_ids[task_idx], nullptr, run_bthread_work, fn) != 0) {

bthread_t bthread_id;
if (bthread_start_background(&bthread_id, nullptr, run_bthread_work, fn) != 0) {
run_bthread_work(fn);
}
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/common/sync_point.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,15 +155,15 @@ auto try_any_cast_ret(std::vector<std::any>& any) {
{ \
std::pair ret {default_ret_val, false}; \
std::vector<std::any> args {__VA_ARGS__}; \
args.push_back(&ret); \
args.emplace_back(&ret); \
doris::SyncPoint::get_instance()->process(x, std::move(args)); \
if (ret.second) return std::move(ret.first); \
}
#define SYNC_POINT_RETURN_WITH_VOID(x, ...) \
{ \
bool pred = false; \
std::vector<std::any> args {__VA_ARGS__}; \
args.push_back(&pred); \
args.emplace_back(&pred); \
doris::SyncPoint::get_instance()->process(x, std::move(args)); \
if (pred) return; \
}
Expand Down
15 changes: 15 additions & 0 deletions be/src/olap/parallel_scanner_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

#include "parallel_scanner_builder.h"

#include "cloud/cloud_meta_mgr.h"
#include "cloud/cloud_tablet.h"
#include "cloud/config.h"
#include "olap/rowset/beta_rowset.h"
#include "pipeline/exec/olap_scan_operator.h"
#include "vec/exec/scan/new_olap_scanner.h"
Expand All @@ -40,6 +43,18 @@ template <typename ParentType>
Status ParallelScannerBuilder<ParentType>::_build_scanners_by_rowid(
std::list<VScannerSPtr>& scanners) {
DCHECK_GE(_rows_per_scanner, _min_rows_per_scanner);

if (config::is_cloud_mode()) {
std::vector<std::function<Status()>> tasks;
tasks.reserve(_tablets.size());
for (auto&& [tablet, version] : _tablets) {
tasks.emplace_back([tablet, version]() {
return std::dynamic_pointer_cast<CloudTablet>(tablet)->sync_rowsets(version);
});
}
RETURN_IF_ERROR(cloud::bthread_fork_join(tasks, 10));
}

for (auto&& [tablet, version] : _tablets) {
DCHECK(_all_rowsets.contains(tablet->tablet_id()));
auto& rowsets = _all_rowsets[tablet->tablet_id()];
Expand Down
17 changes: 15 additions & 2 deletions be/src/pipeline/exec/olap_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@

#include <memory>

#include "cloud/cloud_meta_mgr.h"
#include "cloud/cloud_tablet.h"
#include "cloud/config.h"
#include "olap/parallel_scanner_builder.h"
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
Expand Down Expand Up @@ -277,8 +280,18 @@ Status OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
int64_t version = 0;
std::from_chars(scan_range->version.data(),
scan_range->version.data() + scan_range->version.size(), version);
tablets.emplace_back(
TabletWithVersion {std::dynamic_pointer_cast<Tablet>(tablet), version});
tablets.emplace_back(TabletWithVersion {std::move(tablet), version});
}

if (config::is_cloud_mode()) {
std::vector<std::function<Status()>> tasks;
tasks.reserve(tablets.size());
for (auto&& [tablet, version] : tablets) {
tasks.emplace_back([tablet, version]() {
return std::dynamic_pointer_cast<CloudTablet>(tablet)->sync_rowsets(version);
});
}
RETURN_IF_ERROR(cloud::bthread_fork_join(tasks, 10));
}

if (is_dup_mow_key) {
Expand Down

0 comments on commit fb2c401

Please sign in to comment.