Skip to content

Commit

Permalink
end Kafka consume when meets partition EOF
Browse files Browse the repository at this point in the history
  • Loading branch information
sollhui committed Mar 11, 2024
1 parent fb2c401 commit 98fa695
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 1 deletion.
15 changes: 14 additions & 1 deletion be/src/runtime/routine_load/data_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ Status KafkaDataConsumer::init(std::shared_ptr<StreamLoadContext> ctx) {
};

RETURN_IF_ERROR(set_conf("metadata.broker.list", ctx->kafka_info->brokers));
RETURN_IF_ERROR(set_conf("enable.partition.eof", "false"));
RETURN_IF_ERROR(set_conf("enable.partition.eof", "true"));
RETURN_IF_ERROR(set_conf("enable.auto.offset.store", "false"));
// TODO: set it larger than 0 after we set rd_kafka_conf_set_stats_cb()
RETURN_IF_ERROR(set_conf("statistics.interval.ms", "0"));
Expand Down Expand Up @@ -162,6 +162,7 @@ Status KafkaDataConsumer::assign_topic_partitions(
RdKafka::TopicPartition* tp1 =
RdKafka::TopicPartition::create(topic, entry.first, entry.second);
topic_partitions.push_back(tp1);
_eof_partition_ids.insert(entry.first);
ss << "[" << entry.first << ": " << entry.second << "] ";
}

Expand Down Expand Up @@ -219,6 +220,9 @@ Status KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue,
consumer_watch.stop();
switch (msg->err()) {
case RdKafka::ERR_NO_ERROR:
if (_eof_partition_ids.count(msg->partition()) <= 0) {
_eof_partition_ids.insert(msg->partition());
}
if (msg->len() == 0) {
// ignore msg with length 0.
// put empty msg into queue will cause the load process shutting down.
Expand All @@ -245,6 +249,15 @@ Status KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue,
break;
}
[[fallthrough]];
case RdKafka::ERR__PARTITION_EOF: {
LOG(INFO) << "consumer meet partition eof: " << _id
<< " partition offset: " << msg->offset();
_eof_partition_ids.erase(msg->partition());
if (_eof_partition_ids.size() <= 0) {
done = true;
}
break;
}
default:
LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << msg->errstr();
done = true;
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/routine_load/data_consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ class KafkaDataConsumer : public DataConsumer {
std::string _brokers;
std::string _topic;
std::unordered_map<std::string, std::string> _custom_properties;
std::set<int32_t> _eof_partition_ids;

KafkaEventCb _k_event_cb;
RdKafka::KafkaConsumer* _k_consumer = nullptr;
Expand Down

0 comments on commit 98fa695

Please sign in to comment.