Skip to content

Service Overview

Andrew Choi edited this page Nov 7, 2019 · 10 revisions

The most flexible way to start kafka-monitor is to run kafka-monitor-start.sh with a config file, which allows you to instantiate multiple Service or App that are already implemented in Kafka Monitor and tune their configs to monitor your clusters. Each service has its own thread or scheduler to carry out pre-defined tasks, e.g. produce message or consume message.

In this section we introduce some Service classes that have been implemented in Kafka Monitor. See Service Configuration for their configs.

ProduceService

ProduceService produces messages at regular interval to each partition of a fabricated topic. By producing messages at regular interval, ProduceService is able to measure the availability of produce service provided by Kafka as a fraction number. Furthermore, the message payload contains incremental integers and timestamp, which can be used by the ConsumeService to measure e.g., end-to-end latency and message loss.

In order for ProduceService to measure cluster ProduceAvailability accurately, the topic should be created in advance with the same replication factor as that of most other topics. And the leadership of its partitions ideally should be evenly distributed to every broker in the cluster. User can use TopicManagementService to make sure that the fabricated topic meets these requirement.

By default, ProduceService uses new producer to produce message. User can implement a thin wrapper around their existing producer implementation that implements interface com.linkedin.kmf.producer.KMBaseProducer, to run ProduceService with their custom producer implementation.

In order to measure ProduceAvailability, ProduceService keeps track of the message produced_rate and error_rate. produced_rate is the rate of messages that are successfully sent. error_rate is the rate of exception caught when producer produces messages. Both are measured over the past 60-second time window. Overall ProduceAvailability is measured as the average of per-partition ProduceAvailability. per-partition ProduceAvailability is measured as produce_rate/(produced_rate + error_rate). If both produce_rate and error_rate are 0, ProduceService considers the ProduceAvailability of this partition to be 0 if produce.treat.zero.throughput.as.unavailable is set to true, which is the default config value.

If user overrides produce.treat.zero.throughput.as.unavailable of ProduceService to be false, ProduceService will consider ProduceAvailability of this partition to be 1. This ensures that ProduceService will print exception in the log for further investigation if availability drops below 1. Note that Apache Kafka producer should throw exception if message can not be sent within request.timeout.ms * (retries + 1) + retry.backoff.ms * retries.

ConsumeService

ConsumeService consumes messages from a topic. The messages should be produced by ProduceService. Using incremental index (as integer) and timestamp provided in the message payload, ConsumeService is able to measure the message loss rate, message duplicated rate, end-to-end latency etc. ConsumeService also measures and reports ConsumeAvailability to summarize whether messages are successfully delivered in time.

ConsumeService has built-in support for old consumer and new consumer. User can run ConsumeService with their choice of consumer and configuration. User can also implement a thin wrapper around their existing consumer implementation that implements interface com.linkedin.kmf.consumer.KMBaseConsumer, to run ConsumeService with their custom consumer implementation.

In order to measure the message loss rate and message duplicated rate, ProduceService produces messages with integer index in the message payload. This integer index is incremented by 1 for every successful send per partition. ConsumeService reads index from message payload, and compares the index with the last index observed from the same partition, to determine whether there is lost or duplicated message.

In order to measure end-to-end latency, message payload should contain timestamp at the time the message is constructed. ConsumeService parses the message to obtain the timestamp, and determines the end-to-end latency by subtracting message receive time by this timestamp. This allows user to monitor the latency of a pipeline of Kafka clusters connected by Mirror Makers.

In order to measure ConsumeAvailability, ConsumeService keeps track of message consumed_rate, loss_rate and delayed_rate. All of these are measured over the past 60-second time window. consumed_rate is the rate of messages that are consumed. loss_rate is the rate of messages that are lost. A message is considered lost if a message with higher index is consumed first. delayed_rate is the rate of messages that are consumed whose latency (measured using the timestamp in the payload) exceeds the consume.latency.sla.ms in the ConsumeService config. ConsumeAvailability is measured as (consumed_rate - delayed_rate) / (consumed_rate + loss_rate). If both consumed_rate and loss_rate are 0, we consider the ConsumeAvailability to be 0.

TopicManagementService

TopicManagementService manages the monitor topic of a cluster ensure that every broker is leader of at least one partition of the monitor topic, so that produce-availability metric will drop below 1 if any broker fails. In order to achieve this goal, TopicManagementService monitors the number of brokers, number of partitions, partition assignment across broker and leader distribution across brokers. It may expand partition, reassign partition or trigger preferred leader election if needed.

TopicMangementService can create topic with user-specified config (e.g. replication factor and partition-to-broker ratio) if the fabricated topic doesn't exist yet. By default it creates topic using the Apache Kafka's API. Users can also implement the interface com.linkedin.kmf.topicfactory.TopicFactory to provide custom topic creation logic, e.g., one that setups access-control-list for the fabricated topic in their cluster.

TopicManagementService will also automatically trigger preferred leader election, expand partitions of the monitor topic, or reassign partitions across brokers to ensure that the leadership of partitions of the monitor topic is evenly distributed across brokers. For example, if a new broker is added to the Kafka cluster, TopicManagementService will reassign some partition of the monitor topic to the new broker, and expand partition of the monitor if necessary to ensure that partitionNum > topic-management.partitionsToBrokersRatio * brokerNum.

Clone this wiki locally