Skip to content

Commit

Permalink
[fix](brpc_client_cache) resolve hostname in DNS cache before passing…
Browse files Browse the repository at this point in the history
… to brpc (apache#40074)

Currently brpc does not support resloving IPv6 hostnames, errors will be
returned on `brpc::Channel::Init`.
The brpc client cache may return `nullptr` on its `get_client` or
`get_new_client_no_cache` APIs.

This PR made the following changes:

1. Resolve hostnames from DNS cache before passing it to brpc.
2. Callers should check nullptr after get client, in case of failures.
  • Loading branch information
kaijchen committed Sep 11, 2024
1 parent 81bbd65 commit 91afeec
Show file tree
Hide file tree
Showing 11 changed files with 112 additions and 23 deletions.
14 changes: 13 additions & 1 deletion be/src/cloud/cloud_backend_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,19 @@ void CloudBackendService::warm_up_tablets(TWarmUpTabletsResponse& response,

void CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& response,
const TWarmUpCacheAsyncRequest& request) {
std::string brpc_addr = fmt::format("{}:{}", request.host, request.brpc_port);
std::string host = request.host;
auto dns_cache = ExecEnv::GetInstance()->dns_cache();
if (dns_cache == nullptr) {
LOG(WARNING) << "DNS cache is not initialized, skipping hostname resolve";
} else if (!is_valid_ip(request.host)) {
Status status = dns_cache->get(request.host, &host);
if (!status.ok()) {
LOG(WARNING) << "failed to get ip from host " << request.host << ": "
<< status.to_string();
return;
}
}
std::string brpc_addr = get_host_port(host, request.brpc_port);
Status st = Status::OK();
TStatus t_status;
std::shared_ptr<PBackendService_Stub> brpc_stub =
Expand Down
3 changes: 2 additions & 1 deletion be/src/exec/rowid_fetcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ Status RowIDFetcher::init() {
if (!client) {
LOG(WARNING) << "Get rpc stub failed, host=" << node_info.host
<< ", port=" << node_info.brpc_port;
return Status::InternalError("RowIDFetcher failed to init rpc client");
return Status::InternalError("RowIDFetcher failed to init rpc client, host={}, port={}",
node_info.host, node_info.brpc_port);
}
_stubs.push_back(client);
}
Expand Down
7 changes: 3 additions & 4 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1123,9 +1123,8 @@ Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t local_filt
std::shared_ptr<PBackendService_Stub> stub(
_state->exec_env->brpc_internal_client_cache()->get_client(addr));
if (!stub) {
std::string msg =
fmt::format("Get rpc stub failed, host={}, port=", addr.hostname, addr.port);
return Status::InternalError(msg);
return Status::InternalError("Get rpc stub failed, host={}, port={}", addr.hostname,
addr.port);
}

auto request = std::make_shared<PSendFilterSizeRequest>();
Expand Down Expand Up @@ -1158,7 +1157,7 @@ Status IRuntimeFilter::push_to_remote(const TNetworkAddress* addr) {
_state->exec_env->brpc_internal_client_cache()->get_client(*addr));
if (!stub) {
return Status::InternalError(
fmt::format("Get rpc stub failed, host={}, port=", addr->hostname, addr->port));
fmt::format("Get rpc stub failed, host={}, port={}", addr->hostname, addr->port));
}

auto merge_filter_request = std::make_shared<PMergeFilterRequest>();
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/single_replica_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ Status SingleReplicaCompaction::_get_rowset_verisons_from_peer(
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(addr.host,
addr.brpc_port);
if (stub == nullptr) {
return Status::Aborted("get rpc stub failed");
return Status::Aborted("get rpc stub failed, host={}, port={}", addr.host, addr.brpc_port);
}

brpc::Controller cntl;
Expand Down
18 changes: 16 additions & 2 deletions be/src/runtime/runtime_filter_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -324,10 +324,17 @@ Status RuntimeFilterMergeControllerEntity::send_filter_size(const PSendFilterSiz
cnt_val->global_size += request->filter_size();
cnt_val->source_addrs.push_back(request->source_addr());

Status st = Status::OK();
if (cnt_val->source_addrs.size() == cnt_val->producer_size) {
for (auto addr : cnt_val->source_addrs) {
std::shared_ptr<PBackendService_Stub> stub(
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(addr));
if (stub == nullptr) {
LOG(WARNING) << "Failed to init rpc to " << addr.hostname() << ":" << addr.port();
st = Status::InternalError("Failed to init rpc to {}:{}", addr.hostname(),
addr.port());
continue;
}

auto closure = AutoReleaseClosure<PSyncFilterSizeRequest,
DummyBrpcCallback<PSyncFilterSizeResponse>>::
Expand All @@ -347,7 +354,7 @@ Status RuntimeFilterMergeControllerEntity::send_filter_size(const PSendFilterSiz
closure.release();
}
}
return Status::OK();
return st;
}

Status RuntimeFilterMgr::sync_filter_size(const PSyncFilterSizeRequest* request) {
Expand Down Expand Up @@ -376,6 +383,7 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ
int64_t start_merge = MonotonicMillis();
auto filter_id = request->filter_id();
std::map<int, CntlValwithLock>::iterator iter;
Status st = Status::OK();
{
std::shared_lock<std::shared_mutex> guard(_filter_map_mutex);
iter = _filter_map.find(filter_id);
Expand Down Expand Up @@ -460,14 +468,20 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(
target.target_fragment_instance_addr));
if (stub == nullptr) {
LOG(WARNING) << "Failed to init rpc to "
<< target.target_fragment_instance_addr.hostname << ":"
<< target.target_fragment_instance_addr.port;
st = Status::InternalError("Failed to init rpc to {}:{}",
target.target_fragment_instance_addr.hostname,
target.target_fragment_instance_addr.port);
continue;
}
stub->apply_filterv2(closure->cntl_.get(), closure->request_.get(),
closure->response_.get(), closure.get());
closure.release();
}
}
return Status::OK();
return st;
}

Status RuntimeFilterMergeController::acquire(
Expand Down
5 changes: 5 additions & 0 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1093,6 +1093,11 @@ void PInternalService::fetch_remote_tablet_schema(google::protobuf::RpcControlle
std::shared_ptr<PBackendService_Stub> stub(
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(
host, brpc_port));
if (stub == nullptr) {
LOG(WARNING) << "Failed to init rpc to " << host << ":" << brpc_port;
st = Status::InternalError("Failed to init rpc to {}:{}", host, brpc_port);
continue;
}
rpc_contexts[i].cid = rpc_contexts[i].cntl.call_id();
rpc_contexts[i].cntl.set_timeout_ms(config::fetch_remote_schema_rpc_timeout_ms);
stub->fetch_remote_tablet_schema(&rpc_contexts[i].cntl, &remote_request,
Expand Down
35 changes: 25 additions & 10 deletions be/src/util/brpc_client_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,33 +83,47 @@ class BrpcClientCache {
}

std::shared_ptr<T> get_client(const std::string& host, int port) {
std::string realhost;
realhost = host;
if (!is_valid_ip(host)) {
Status status = ExecEnv::GetInstance()->dns_cache()->get(host, &realhost);
std::string realhost = host;
auto dns_cache = ExecEnv::GetInstance()->dns_cache();
if (dns_cache == nullptr) {
LOG(WARNING) << "DNS cache is not initialized, skipping hostname resolve";
} else if (!is_valid_ip(host)) {
Status status = dns_cache->get(host, &realhost);
if (!status.ok()) {
LOG(WARNING) << "failed to get ip from host:" << status.to_string();
return nullptr;
}
}
std::string host_port = get_host_port(realhost, port);
return get_client(host_port);
}

std::shared_ptr<T> get_client(const std::string& host_port) {
std::shared_ptr<T> stub_ptr;
auto get_value = [&stub_ptr](const auto& v) { stub_ptr = v.second; };
if (LIKELY(_stub_map.if_contains(host_port, get_value))) {
DCHECK(stub_ptr != nullptr);
return stub_ptr;
}

// new one stub and insert into map
auto stub = get_new_client_no_cache(host_port);
_stub_map.try_emplace_l(
host_port, [&stub](const auto& v) { stub = v.second; }, stub);
if (stub != nullptr) {
_stub_map.try_emplace_l(
host_port, [&stub](const auto& v) { stub = v.second; }, stub);
}
return stub;
}

std::shared_ptr<T> get_client(const std::string& host_port) {
int pos = host_port.rfind(':');
std::string host = host_port.substr(0, pos);
int port = 0;
try {
port = stoi(host_port.substr(pos + 1));
} catch (const std::exception& err) {
LOG(WARNING) << "failed to parse port from " << host_port << ": " << err.what();
return nullptr;
}
return get_client(host, port);
}

std::shared_ptr<T> get_new_client_no_cache(const std::string& host_port,
const std::string& protocol = "",
const std::string& connection_type = "",
Expand Down Expand Up @@ -143,6 +157,7 @@ class BrpcClientCache {
channel->Init(host_port.c_str(), config::rpc_load_balancer.c_str(), &options);
}
if (ret_code) {
LOG(WARNING) << "Failed to initialize brpc Channel to " << host_port;
return nullptr;
}
return std::make_shared<T>(channel.release(), google::protobuf::Service::STUB_OWNS_CHANNEL);
Expand Down
15 changes: 15 additions & 0 deletions be/src/util/proto_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,26 @@ Status transmit_block_httpv2(ExecEnv* exec_env, std::unique_ptr<Closure> closure
TNetworkAddress brpc_dest_addr) {
RETURN_IF_ERROR(request_embed_attachment_contain_blockv2(closure->request_.get(), closure));

std::string host = brpc_dest_addr.hostname;
auto dns_cache = ExecEnv::GetInstance()->dns_cache();
if (dns_cache == nullptr) {
LOG(WARNING) << "DNS cache is not initialized, skipping hostname resolve";
} else if (!is_valid_ip(brpc_dest_addr.hostname)) {
Status status = dns_cache->get(brpc_dest_addr.hostname, &host);
if (!status.ok()) {
LOG(WARNING) << "failed to get ip from host " << brpc_dest_addr.hostname << ": "
<< status.to_string();
return Status::InternalError("failed to get ip from host {}", brpc_dest_addr.hostname);
}
}
//format an ipv6 address
std::string brpc_url = get_brpc_http_url(brpc_dest_addr.hostname, brpc_dest_addr.port);

std::shared_ptr<PBackendService_Stub> brpc_http_stub =
exec_env->brpc_internal_client_cache()->get_new_client_no_cache(brpc_url, "http");
if (brpc_http_stub == nullptr) {
return Status::InternalError("failed to open brpc http client to {}", brpc_url);
}
closure->cntl_->http_request().uri() =
brpc_url + "/PInternalServiceImpl/transmit_block_by_http";
closure->cntl_->http_request().set_method(brpc::HTTP_METHOD_POST);
Expand Down
5 changes: 5 additions & 0 deletions be/src/vec/functions/function_rpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ Status RPCFnImpl::vec_call(FunctionContext* context, Block& block, const ColumnN
size_t result, size_t input_rows_count) {
PFunctionCallRequest request;
PFunctionCallResponse response;
if (_client == nullptr) {
return Status::InternalError(
"call to rpc function {} failed: init rpc error, server addr = {}", _signature,
_server_addr);
}
request.set_function_name(_function_name);
RETURN_IF_ERROR(_convert_block_to_proto(block, arguments, input_rows_count, &request));
brpc::Controller cntl;
Expand Down
10 changes: 7 additions & 3 deletions be/src/vec/sink/load_stream_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
return _init_st;
}
_dst_id = node_info.id;
std::string host_port = get_host_port(node_info.host, node_info.brpc_port);
brpc::StreamOptions opt;
opt.max_buf_size = config::load_stream_max_buf_size;
opt.idle_timeout_ms = idle_timeout_ms;
Expand Down Expand Up @@ -185,7 +184,11 @@ Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
}
POpenLoadStreamResponse response;
// set connection_group "streaming" to distinguish with non-streaming connections
const auto& stub = client_cache->get_client(host_port);
const auto& stub = client_cache->get_client(node_info.host, node_info.brpc_port);
if (stub == nullptr) {
return Status::InternalError("failed to init brpc client to {}:{}", node_info.host,
node_info.brpc_port);
}
stub->open_load_stream(&cntl, &request, &response, nullptr);
for (const auto& resp : response.tablet_schemas()) {
auto tablet_schema = std::make_unique<TabletSchema>();
Expand All @@ -200,7 +203,8 @@ Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
cntl.ErrorText());
return _init_st;
}
LOG(INFO) << "open load stream to " << host_port << ", " << *this;
LOG(INFO) << "open load stream to host=" << node_info.host << ", port=" << node_info.brpc_port
<< ", " << *this;
_is_init.store(true);
return Status::OK();
}
Expand Down
21 changes: 20 additions & 1 deletion be/src/vec/sink/writer/vtablet_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -717,11 +717,30 @@ void VNodeChannel::try_send_pending_block(RuntimeState* state) {
return;
}

std::string host = _node_info.host;
auto dns_cache = ExecEnv::GetInstance()->dns_cache();
if (dns_cache == nullptr) {
LOG(WARNING) << "DNS cache is not initialized, skipping hostname resolve";
} else if (!is_valid_ip(_node_info.host)) {
Status status = dns_cache->get(_node_info.host, &host);
if (!status.ok()) {
LOG(WARNING) << "failed to get ip from host " << _node_info.host << ": "
<< status.to_string();
_send_block_callback->clear_in_flight();
return;
}
}
//format an ipv6 address
std::string brpc_url = get_brpc_http_url(_node_info.host, _node_info.brpc_port);
std::string brpc_url = get_brpc_http_url(host, _node_info.brpc_port);
std::shared_ptr<PBackendService_Stub> _brpc_http_stub =
_state->exec_env()->brpc_internal_client_cache()->get_new_client_no_cache(brpc_url,
"http");
if (_brpc_http_stub == nullptr) {
cancel(fmt::format("{}, failed to open brpc http client to {}", channel_info(),
brpc_url));
_send_block_callback->clear_in_flight();
return;
}
_send_block_callback->cntl_->http_request().uri() =
brpc_url + "/PInternalServiceImpl/tablet_writer_add_block_by_http";
_send_block_callback->cntl_->http_request().set_method(brpc::HTTP_METHOD_POST);
Expand Down

0 comments on commit 91afeec

Please sign in to comment.