Skip to content

Commit

Permalink
[Fix](hive-writer) Fix hive partition update core.
Browse files Browse the repository at this point in the history
  • Loading branch information
kaka11chen committed May 23, 2024
1 parent 8502675 commit e24fd30
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 33 deletions.
45 changes: 33 additions & 12 deletions be/src/vec/sink/writer/vhive_partition_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,19 @@ VHivePartitionWriter::VHivePartitionWriter(
_hadoop_conf(hadoop_conf) {}

Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* profile) {
bool expected = false;
if (_opened.compare_exchange_strong(expected, true)) {
Status status = _open_internal(state, profile);
if (!status.ok()) {
_opened.store(false);
}
return status;
} else {
return Status::InternalError("VHivePartitionWriter is already opened.");
}
}

Status VHivePartitionWriter::_open_internal(RuntimeState* state, RuntimeProfile* profile) {
_state = state;

io::FSPropertiesRef fs_properties(_write_info.file_type);
Expand Down Expand Up @@ -147,21 +160,29 @@ Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* profile)
}

Status VHivePartitionWriter::close(const Status& status) {
if (_file_format_transformer != nullptr) {
Status st = _file_format_transformer->close();
if (!st.ok()) {
LOG(WARNING) << fmt::format("_file_format_transformer close failed, reason: {}",
st.to_string());
bool expected = true;
if (_opened.compare_exchange_strong(expected, false)) {
if (_file_format_transformer != nullptr) {
Status st = _file_format_transformer->close();
if (!st.ok()) {
LOG(WARNING) << fmt::format("_file_format_transformer close failed, reason: {}",
st.to_string());
}
}
}
if (!status.ok() && _fs != nullptr) {
auto path = fmt::format("{}/{}", _write_info.write_path, _file_name);
Status st = _fs->delete_file(path);
if (!st.ok()) {
LOG(WARNING) << fmt::format("Delete file {} failed, reason: {}", path, st.to_string());
if (!status.ok() && _fs != nullptr) {
auto path = fmt::format("{}/{}", _write_info.write_path, _file_name);
Status st = _fs->delete_file(path);
if (!st.ok()) {
LOG(WARNING) << fmt::format("Delete file {} failed, reason: {}", path,
st.to_string());
}
}
if (status.ok()) {
_state->hive_partition_updates().emplace_back(_build_partition_update());
}
} else {
LOG(WARNING) << "VHivePartitionWriter is not opened or already closed.";
}
_state->hive_partition_updates().emplace_back(_build_partition_update());
return Status::OK();
}

Expand Down
4 changes: 4 additions & 0 deletions be/src/vec/sink/writer/vhive_partition_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ class VHivePartitionWriter {
std::string _get_target_file_name();

private:
Status _open_internal(RuntimeState* state, RuntimeProfile* profile);

Status _projection_and_filter_block(doris::vectorized::Block& input_block,
const vectorized::IColumn::Filter* filter,
doris::vectorized::Block* output_block);
Expand Down Expand Up @@ -114,6 +116,8 @@ class VHivePartitionWriter {
std::unique_ptr<VFileFormatTransformer> _file_format_transformer = nullptr;

RuntimeState* _state;

std::atomic<bool> _opened = false;
};
} // namespace vectorized
} // namespace doris
60 changes: 39 additions & 21 deletions be/src/vec/sink/writer/vhive_table_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,19 @@ Status VHiveTableWriter::init_properties(ObjectPool* pool) {
}

Status VHiveTableWriter::open(RuntimeState* state, RuntimeProfile* profile) {
bool expected = false;
if (_opened.compare_exchange_strong(expected, true)) {
Status status = _open_internal(state, profile);
if (!status.ok()) {
_opened.store(false);
}
return status;
} else {
return Status::InternalError("VHiveTableWriter is already opened.");
}
}

Status VHiveTableWriter::_open_internal(RuntimeState* state, RuntimeProfile* profile) {
_state = state;
_profile = profile;

Expand Down Expand Up @@ -202,28 +215,34 @@ Status VHiveTableWriter::write(vectorized::Block& block) {
}

Status VHiveTableWriter::close(Status status) {
int64_t partitions_to_writers_size = _partitions_to_writers.size();
{
SCOPED_RAW_TIMER(&_close_ns);
for (const auto& pair : _partitions_to_writers) {
Status st = pair.second->close(status);
if (st != Status::OK()) {
LOG(WARNING) << fmt::format("Unsupported type for partition {}", st.to_string());
continue;
bool expected = true;
if (_opened.compare_exchange_strong(expected, false)) {
int64_t partitions_to_writers_size = _partitions_to_writers.size();
{
SCOPED_RAW_TIMER(&_close_ns);
for (const auto& pair : _partitions_to_writers) {
Status st = pair.second->close(status);
if (st != Status::OK()) {
LOG(WARNING) << fmt::format("partition writer close failed for partition {}",
st.to_string());
continue;
}
}
_partitions_to_writers.clear();
}
_partitions_to_writers.clear();
}
if (status.ok()) {
SCOPED_TIMER(_profile->total_time_counter());

COUNTER_SET(_written_rows_counter, static_cast<int64_t>(_row_count));
COUNTER_SET(_send_data_timer, _send_data_ns);
COUNTER_SET(_partition_writers_dispatch_timer, _partition_writers_dispatch_ns);
COUNTER_SET(_partition_writers_write_timer, _partition_writers_write_ns);
COUNTER_SET(_partition_writers_count, partitions_to_writers_size);
COUNTER_SET(_close_timer, _close_ns);
COUNTER_SET(_write_file_counter, _write_file_count);
if (status.ok()) {
SCOPED_TIMER(_profile->total_time_counter());

COUNTER_SET(_written_rows_counter, static_cast<int64_t>(_row_count));
COUNTER_SET(_send_data_timer, _send_data_ns);
COUNTER_SET(_partition_writers_dispatch_timer, _partition_writers_dispatch_ns);
COUNTER_SET(_partition_writers_write_timer, _partition_writers_write_ns);
COUNTER_SET(_partition_writers_count, partitions_to_writers_size);
COUNTER_SET(_close_timer, _close_ns);
COUNTER_SET(_write_file_counter, _write_file_count);
}
} else {
LOG(WARNING) << "VHiveTableWriter is not opened or already closed.";
}
return Status::OK();
}
Expand Down Expand Up @@ -414,7 +433,6 @@ std::string VHiveTableWriter::_to_partition_value(const TypeDescriptor& type_des
char buf[64];
char* pos = value.to_string(buf);
return std::string(buf, pos - buf - 1);
break;
}
case TYPE_DATEV2: {
DateV2Value<DateV2ValueType> value =
Expand Down
4 changes: 4 additions & 0 deletions be/src/vec/sink/writer/vhive_table_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ class VHiveTableWriter final : public AsyncResultWriter {
Status close(Status) override;

private:
Status _open_internal(RuntimeState* state, RuntimeProfile* profile);

std::shared_ptr<VHivePartitionWriter> _create_partition_writer(
vectorized::Block& block, int position, const std::string* file_name = nullptr,
int file_name_index = 0);
Expand All @@ -74,6 +76,8 @@ class VHiveTableWriter final : public AsyncResultWriter {

size_t _row_count = 0;

std::atomic<bool> _opened = false;

// profile counters
int64_t _send_data_ns = 0;
int64_t _partition_writers_dispatch_ns = 0;
Expand Down

0 comments on commit e24fd30

Please sign in to comment.