Skip to content

Commit

Permalink
[fix](load) fix race condition problem when insert commitinfo (apache…
Browse files Browse the repository at this point in the history
…#20823)

Signed-off-by: freemandealer <[email protected]>
  • Loading branch information
freemandealer committed Jun 15, 2023
1 parent 71e8cb0 commit 4bf15b9
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 11 deletions.
15 changes: 8 additions & 7 deletions be/src/io/fs/multi_table_pipe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ Status MultiTablePipe::dispatch(const std::string& table, const char* data, size
auto iter = _planned_pipes.find(table);
if (iter != _planned_pipes.end()) {
pipe = iter->second;
LOG(INFO) << "dispatch for planned pipe: " << pipe.get();
RETURN_NOT_OK_STATUS_WITH_WARN((pipe.get()->*cb)(data, size),
"append failed in planned kafka pipe");
} else {
Expand All @@ -111,7 +110,6 @@ Status MultiTablePipe::dispatch(const std::string& table, const char* data, size
} else {
pipe = iter->second;
}
LOG(INFO) << "dispatch for unplanned pipe: " << pipe.get();
RETURN_NOT_OK_STATUS_WITH_WARN((pipe.get()->*cb)(data, size),
"append failed in unplanned kafka pipe");

Expand Down Expand Up @@ -187,20 +185,22 @@ Status MultiTablePipe::request_and_exec_plans() {
_ctx->multi_table_put_result.params.size());
_unplanned_pipes.clear();

_inflight_plan_cnt += _ctx->multi_table_put_result.params.size();
for (auto& plan : _ctx->multi_table_put_result.params) {
// TODO: use pipeline in the future (currently is buggy for load)
++_inflight_plan_cnt;
DCHECK_EQ(plan.__isset.table_name, true);
DCHECK(_planned_pipes.find(plan.table_name) != _planned_pipes.end());
putPipe(plan.params.fragment_instance_id, _planned_pipes[plan.table_name]);
LOG(INFO) << "fragment_instance_id=" << plan.params.fragment_instance_id
<< " table=" << plan.table_name;
exec_env->fragment_mgr()->exec_plan_fragment(plan, [this](RuntimeState* state,
Status* status) {
--_inflight_plan_cnt;
_tablet_commit_infos.insert(_tablet_commit_infos.end(),
state->tablet_commit_infos().begin(),
state->tablet_commit_infos().end());
{
std::lock_guard<std::mutex> l(_tablet_commit_infos_lock);
_tablet_commit_infos.insert(_tablet_commit_infos.end(),
state->tablet_commit_infos().begin(),
state->tablet_commit_infos().end());
}
_number_total_rows += state->num_rows_load_total();
_number_loaded_rows += state->num_rows_load_success();
_number_filtered_rows += state->num_rows_load_filtered();
Expand All @@ -224,6 +224,7 @@ Status MultiTablePipe::request_and_exec_plans() {
_status = *status;
}

--_inflight_plan_cnt;
if (_inflight_plan_cnt == 0 && is_consume_finished()) {
_ctx->number_total_rows = _number_total_rows;
_ctx->number_loaded_rows = _number_loaded_rows;
Expand Down
9 changes: 5 additions & 4 deletions be/src/io/fs/multi_table_pipe.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,12 @@ class MultiTablePipe : public KafkaConsumerPipe {
std::shared_ptr<StreamLoadContext> _ctx;
Status _status; // save the first error status of all executing plan fragment
#ifndef BE_TEST
std::mutex _tablet_commit_infos_lock;
std::vector<TTabletCommitInfo> _tablet_commit_infos; // collect from each plan fragment
int64_t _number_total_rows = 0;
int64_t _number_loaded_rows = 0;
int64_t _number_filtered_rows = 0;
int64_t _number_unselected_rows = 0;
std::atomic<int64_t> _number_total_rows {0};
std::atomic<int64_t> _number_loaded_rows {0};
std::atomic<int64_t> _number_filtered_rows {0};
std::atomic<int64_t> _number_unselected_rows {0};
#endif
std::mutex _pipe_map_lock;
std::unordered_map<TUniqueId /*instance id*/, std::shared_ptr<io::StreamLoadPipe>> _pipe_map;
Expand Down

0 comments on commit 4bf15b9

Please sign in to comment.