Skip to content

Commit

Permalink
[Fix](parquet-reader) Fix and optimize parquet min-max filtering. (ap…
Browse files Browse the repository at this point in the history
…ache#38277)

## Proposed changes

Refer to trino's implementation

- Some bugs in the historical version paquet-mr. Use
`CorruptStatistics::should_ignore_statistics()` to handle.

- The old version of parquet uses `min` and `max` stats, and later
implements `min_value` and `max_value`. `Min`/`max` stats cannot be used
for some types and in some cases. This is related to the comparison and
sorting method of values.

- If it is double or float, special cases such as NaN, -0, and 0 must be
handled.

- If the string type only has min and max stats, but no min_value or
max_value, use `ParquetPredicate::_try_read_old_utf8_stats()` to expand
the range reading optimization method for optimization.
  • Loading branch information
kaka11chen committed Aug 12, 2024
1 parent 1810cba commit 433b84a
Show file tree
Hide file tree
Showing 9 changed files with 1,207 additions and 26 deletions.
340 changes: 340 additions & 0 deletions be/src/vec/exec/format/parquet/parquet_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,4 +162,344 @@ bool ColumnSelectVector::can_filter_all(size_t remaining_num_values) {
void ColumnSelectVector::skip(size_t num_values) {
_filter_map_index += num_values;
}

ParsedVersion::ParsedVersion(std::string application, std::optional<std::string> version,
std::optional<std::string> app_build_hash)
: _application(std::move(application)),
_version(std::move(version)),
_app_build_hash(std::move(app_build_hash)) {}

bool ParsedVersion::operator==(const ParsedVersion& other) const {
return _application == other._application && _version == other._version &&
_app_build_hash == other._app_build_hash;
}

bool ParsedVersion::operator!=(const ParsedVersion& other) const {
return !(*this == other);
}

size_t ParsedVersion::hash() const {
std::hash<std::string> hasher;
return hasher(_application) ^ (_version ? hasher(*_version) : 0) ^
(_app_build_hash ? hasher(*_app_build_hash) : 0);
}

std::string ParsedVersion::to_string() const {
return "ParsedVersion(application=" + _application +
", semver=" + (_version ? *_version : "null") +
", app_build_hash=" + (_app_build_hash ? *_app_build_hash : "null") + ")";
}

Status VersionParser::parse(const std::string& created_by,
std::unique_ptr<ParsedVersion>* parsed_version) {
static const std::string FORMAT =
"(.*?)\\s+version\\s*(?:([^(]*?)\\s*(?:\\(\\s*build\\s*([^)]*?)\\s*\\))?)?";
static const std::regex PATTERN(FORMAT);

std::smatch matcher;
if (!std::regex_match(created_by, matcher, PATTERN)) {
return Status::InternalError(fmt::format("Could not parse created_by: {}, using format: {}",
created_by, FORMAT));
}

std::string application = matcher[1].str();
if (application.empty()) {
return Status::InternalError("application cannot be null or empty");
}
std::optional<std::string> semver =
matcher[2].str().empty() ? std::nullopt : std::optional<std::string>(matcher[2].str());
std::optional<std::string> app_build_hash =
matcher[3].str().empty() ? std::nullopt : std::optional<std::string>(matcher[3].str());
*parsed_version = std::make_unique<ParsedVersion>(application, semver, app_build_hash);
return Status::OK();
}

SemanticVersion::SemanticVersion(int major, int minor, int patch)
: _major(major),
_minor(minor),
_patch(patch),
_prerelease(false),
_unknown(std::nullopt),
_pre(std::nullopt),
_build_info(std::nullopt) {}

#ifdef BE_TEST
SemanticVersion::SemanticVersion(int major, int minor, int patch, bool has_unknown)
: _major(major),
_minor(minor),
_patch(patch),
_prerelease(has_unknown),
_unknown(std::nullopt),
_pre(std::nullopt),
_build_info(std::nullopt) {}
#endif

SemanticVersion::SemanticVersion(int major, int minor, int patch,
std::optional<std::string> unknown, std::optional<std::string> pre,
std::optional<std::string> build_info)
: _major(major),
_minor(minor),
_patch(patch),
_prerelease(unknown.has_value() && !unknown.value().empty()),
_unknown(std::move(unknown)),
_pre(pre.has_value() ? std::optional<Prerelease>(Prerelease(std::move(pre.value())))
: std::nullopt),
_build_info(std::move(build_info)) {}

Status SemanticVersion::parse(const std::string& version,
std::unique_ptr<SemanticVersion>* semantic_version) {
static const std::regex pattern(R"(^(\d+)\.(\d+)\.(\d+)([^-+]*)?(?:-([^+]*))?(?:\+(.*))?$)");
std::smatch match;

if (!std::regex_match(version, match, pattern)) {
return Status::InternalError(version + " does not match format");
}

int major = std::stoi(match[1].str());
int minor = std::stoi(match[2].str());
int patch = std::stoi(match[3].str());
std::optional<std::string> unknown =
match[4].str().empty() ? std::nullopt : std::optional<std::string>(match[4].str());
std::optional<std::string> prerelease =
match[5].str().empty() ? std::nullopt : std::optional<std::string>(match[5].str());
std::optional<std::string> build_info =
match[6].str().empty() ? std::nullopt : std::optional<std::string>(match[6].str());
if (major < 0 || minor < 0 || patch < 0) {
return Status::InternalError("major({}), minor({}), and patch({}) must all be >= 0", major,
minor, patch);
}
*semantic_version =
std::make_unique<SemanticVersion>(major, minor, patch, unknown, prerelease, build_info);
return Status::OK();
}

int SemanticVersion::compare_to(const SemanticVersion& other) const {
if (int cmp = _compare_integers(_major, other._major); cmp != 0) {
return cmp;
}
if (int cmp = _compare_integers(_minor, other._minor); cmp != 0) {
return cmp;
}
if (int cmp = _compare_integers(_patch, other._patch); cmp != 0) {
return cmp;
}
if (int cmp = _compare_booleans(other._prerelease, _prerelease); cmp != 0) {
return cmp;
}
if (_pre.has_value()) {
if (other._pre.has_value()) {
return _pre.value().compare_to(other._pre.value());
} else {
return -1;
}
} else if (other._pre.has_value()) {
return 1;
}
return 0;
}

bool SemanticVersion::operator==(const SemanticVersion& other) const {
return compare_to(other) == 0;
}

bool SemanticVersion::operator!=(const SemanticVersion& other) const {
return !(*this == other);
}

std::string SemanticVersion::to_string() const {
std::string result =
std::to_string(_major) + "." + std::to_string(_minor) + "." + std::to_string(_patch);
if (_prerelease && _unknown) result += _unknown.value();
if (_pre) result += _pre.value().to_string();
if (_build_info) result += _build_info.value();
return result;
}

SemanticVersion::NumberOrString::NumberOrString(const std::string& value_string)
: _original(value_string) {
const static std::regex NUMERIC("\\d+");
_is_numeric = std::regex_match(_original, NUMERIC);
_number = -1;
if (_is_numeric) {
_number = std::stoi(_original);
}
}

SemanticVersion::NumberOrString::NumberOrString(const NumberOrString& other)
: _original(other._original), _is_numeric(other._is_numeric), _number(other._number) {}

int SemanticVersion::NumberOrString::compare_to(const SemanticVersion::NumberOrString& that) const {
if (this->_is_numeric != that._is_numeric) {
return this->_is_numeric ? -1 : 1;
}

if (_is_numeric) {
return this->_number - that._number;
}

return this->_original.compare(that._original);
}

std::string SemanticVersion::NumberOrString::to_string() const {
return _original;
}

bool SemanticVersion::NumberOrString::operator<(const SemanticVersion::NumberOrString& that) const {
return compare_to(that) < 0;
}

bool SemanticVersion::NumberOrString::operator==(
const SemanticVersion::NumberOrString& that) const {
return compare_to(that) == 0;
}

bool SemanticVersion::NumberOrString::operator!=(
const SemanticVersion::NumberOrString& that) const {
return !(*this == that);
}

bool SemanticVersion::NumberOrString::operator>(const SemanticVersion::NumberOrString& that) const {
return compare_to(that) > 0;
}

bool SemanticVersion::NumberOrString::operator<=(
const SemanticVersion::NumberOrString& that) const {
return !(*this > that);
}

bool SemanticVersion::NumberOrString::operator>=(
const SemanticVersion::NumberOrString& that) const {
return !(*this < that);
}

int SemanticVersion::_compare_integers(int x, int y) {
return (x < y) ? -1 : ((x == y) ? 0 : 1);
}

int SemanticVersion::_compare_booleans(bool x, bool y) {
return (x == y) ? 0 : (x ? 1 : -1);
}

std::vector<std::string> SemanticVersion::Prerelease::_split(const std::string& s,
const std::regex& delimiter) {
std::sregex_token_iterator iter(s.begin(), s.end(), delimiter, -1);
std::sregex_token_iterator end;
std::vector<std::string> tokens(iter, end);
return tokens;
}

SemanticVersion::Prerelease::Prerelease(std::string original) : _original(std::move(original)) {
static const std::regex DOT("\\.");
auto parts = _split(_original, DOT);
for (const auto& part : parts) {
NumberOrString number_or_string(part);
_identifiers.emplace_back(number_or_string);
}
}

int SemanticVersion::Prerelease::compare_to(const Prerelease& that) const {
int size = std::min(this->_identifiers.size(), that._identifiers.size());
for (int i = 0; i < size; ++i) {
int cmp = this->_identifiers[i].compare_to(that._identifiers[i]);
if (cmp != 0) {
return cmp;
}
}
return static_cast<int>(this->_identifiers.size()) - static_cast<int>(that._identifiers.size());
}

std::string SemanticVersion::Prerelease::to_string() const {
return _original;
}

bool SemanticVersion::Prerelease::operator<(const Prerelease& that) const {
return compare_to(that) < 0;
}

bool SemanticVersion::Prerelease::operator==(const Prerelease& that) const {
return compare_to(that) == 0;
}

bool SemanticVersion::Prerelease::operator!=(const Prerelease& that) const {
return !(*this == that);
}

bool SemanticVersion::Prerelease::operator>(const Prerelease& that) const {
return compare_to(that) > 0;
}

bool SemanticVersion::Prerelease::operator<=(const Prerelease& that) const {
return !(*this > that);
}

bool SemanticVersion::Prerelease::operator>=(const Prerelease& that) const {
return !(*this < that);
}

const SemanticVersion CorruptStatistics::PARQUET_251_FIXED_VERSION(1, 8, 0);
const SemanticVersion CorruptStatistics::CDH_5_PARQUET_251_FIXED_START(1, 5, 0, std::nullopt,
"cdh5.5.0", std::nullopt);
const SemanticVersion CorruptStatistics::CDH_5_PARQUET_251_FIXED_END(1, 5, 0);

bool CorruptStatistics::should_ignore_statistics(const std::string& created_by,
tparquet::Type::type physical_type) {
if (physical_type != tparquet::Type::BYTE_ARRAY &&
physical_type != tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
// The bug only applies to binary columns
return false;
}

if (created_by.empty()) {
// created_by is not populated
VLOG_DEBUG
<< "Ignoring statistics because created_by is null or empty! See PARQUET-251 and "
"PARQUET-297";
return true;
}

Status status;
std::unique_ptr<ParsedVersion> parsed_version;
status = VersionParser::parse(created_by, &parsed_version);
if (!status.ok()) {
VLOG_DEBUG << "Ignoring statistics because created_by could not be parsed (see "
"PARQUET-251)."
" CreatedBy: "
<< created_by << ", msg: " << status.msg();
return true;
}

if (parsed_version->application() != "parquet-mr") {
// Assume other applications don't have this bug
return false;
}

if ((!parsed_version->version().has_value()) || parsed_version->version().value().empty()) {
VLOG_DEBUG << "Ignoring statistics because created_by did not contain a semver (see "
"PARQUET-251): "
<< created_by;
return true;
}

std::unique_ptr<SemanticVersion> semantic_version;
status = SemanticVersion::parse(parsed_version->version().value(), &semantic_version);
if (!status.ok()) {
VLOG_DEBUG << "Ignoring statistics because created_by could not be parsed (see "
"PARQUET-251)."
" CreatedBy: "
<< created_by << ", msg: " << status.msg();
return true;
}
if (semantic_version->compare_to(PARQUET_251_FIXED_VERSION) < 0 &&
!(semantic_version->compare_to(CDH_5_PARQUET_251_FIXED_START) >= 0 &&
semantic_version->compare_to(CDH_5_PARQUET_251_FIXED_END) < 0)) {
VLOG_DEBUG
<< "Ignoring statistics because this file was created prior to the fixed version, "
"see PARQUET-251";
return true;
}

// This file was created after the fix
return false;
}

} // namespace doris::vectorized
Loading

0 comments on commit 433b84a

Please sign in to comment.