Skip to content

Commit

Permalink
KAFKA-4381: Add per partition lag metrics to the consumer
Browse files Browse the repository at this point in the history
Author: Jiangjie Qin <[email protected]>

Reviewers: Ismael Juma <[email protected]>, Ewen Cheslack-Postava <[email protected]>

Closes apache#2329 from becketqin/KAFKA-4381
  • Loading branch information
becketqin authored and ewencp committed Jan 13, 2017
1 parent e90db3e commit 88fdca2
Show file tree
Hide file tree
Showing 10 changed files with 218 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,7 @@ private KafkaConsumer(ConsumerConfig config,
this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs,
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG));
OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT));
this.subscriptions = new SubscriptionState(offsetResetStrategy);
this.subscriptions = new SubscriptionState(offsetResetStrategy, metrics);
List<PartitionAssignor> assignors = config.getConfiguredInstances(
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
PartitionAssignor.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
Expand Down Expand Up @@ -34,6 +34,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import org.apache.kafka.common.metrics.Metrics;


/**
* A mock of the {@link Consumer} interface you can use for testing code that uses Kafka. This class is <i> not
Expand All @@ -57,7 +59,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
private AtomicBoolean wakeup;

public MockConsumer(OffsetResetStrategy offsetResetStrategy) {
this.subscriptions = new SubscriptionState(offsetResetStrategy);
this.subscriptions = new SubscriptionState(offsetResetStrategy, new Metrics());
this.partitions = new HashMap<>();
this.records = new HashMap<>();
this.paused = new HashSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.kafka.common.metrics.stats.Count;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.record.LogEntry;
Expand Down Expand Up @@ -500,6 +501,11 @@ private List<ConsumerRecord<K, V>> drainRecords(PartitionRecords<K, V> partition
"position to {}", position, partitionRecords.partition, nextOffset);

subscriptions.position(partitionRecords.partition, nextOffset);
Long partitionLag = subscriptions.partitionLag(partitionRecords.partition);
if (partitionLag != null) {
this.sensors.recordsFetchLag.record(partitionLag);
this.sensors.recordPartitionFetchLag(partitionRecords.partition, partitionLag);
}
return partRecords;
} else {
// these records aren't next in line based on the last consumed position, ignore them
Expand Down Expand Up @@ -763,12 +769,23 @@ private PartitionRecords<K, V> parseFetchedData(CompletedFetch completedFetch) {
if (!parsed.isEmpty()) {
log.trace("Adding fetched record for partition {} with offset {} to buffered record list", tp, position);
parsedRecords = new PartitionRecords<>(fetchOffset, tp, parsed);
ConsumerRecord<K, V> record = parsed.get(parsed.size() - 1);
this.sensors.recordsFetchLag.record(partition.highWatermark - record.offset());
} else if (partition.highWatermark >= 0) {
log.trace("Received empty fetch response for partition {} with offset {}", tp, position);
this.sensors.recordsFetchLag.record(partition.highWatermark - fetchOffset);
}

if (partition.highWatermark >= 0) {
log.trace("Received {} records in fetch response for partition {} with offset {}", parsed.size(), tp, position);
Long partitionLag = subscriptions.partitionLag(tp);
subscriptions.updateHighWatermark(tp, partition.highWatermark);
// If the partition lag is null, that means this is the first fetch response for this partition.
// We update the lag here to create the lag metric. This is to handle the case that there is no
// message consumed by the end user from this partition. If there are messages returned from the
// partition, the lag will be updated when those messages are consumed by the end user.
if (partitionLag == null) {
partitionLag = subscriptions.partitionLag(tp);
this.sensors.recordsFetchLag.record(partitionLag);
this.sensors.recordPartitionFetchLag(tp, partitionLag);
}
}

} else if (error == Errors.NOT_LEADER_FOR_PARTITION) {
log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName());
this.metadata.requestUpdate();
Expand Down Expand Up @@ -1057,6 +1074,23 @@ public void recordTopicFetchMetrics(String topic, int bytes, int records) {
}
recordsFetched.record(records);
}

public void recordPartitionFetchLag(TopicPartition tp, long lag) {
String name = tp + ".records-lag";
Sensor recordsLag = this.metrics.getSensor(name);
if (recordsLag == null) {
recordsLag = this.metrics.sensor(name);
recordsLag.add(this.metrics.metricName(name, this.metricGrpName, "The latest lag of the partition"),
new Value());
recordsLag.add(this.metrics.metricName(name + "-max",
this.metricGrpName,
"The max lag of the partition"), new Max());
recordsLag.add(this.metrics.metricName(name + "-avg",
this.metricGrpName,
"The average lag of the partition"), new Avg());
}
recordsLag.record(lag);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.PartitionStates;
import org.apache.kafka.common.metrics.Metrics;

import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -80,13 +81,16 @@ private enum SubscriptionType {
/* Listener to be invoked when assignment changes */
private ConsumerRebalanceListener listener;

public SubscriptionState(OffsetResetStrategy defaultResetStrategy) {
private final Metrics metrics;

public SubscriptionState(OffsetResetStrategy defaultResetStrategy, Metrics metrics) {
this.defaultResetStrategy = defaultResetStrategy;
this.subscription = Collections.emptySet();
this.assignment = new PartitionStates<>();
this.groupSubscription = new HashSet<>();
this.needsFetchCommittedOffsets = true; // initialize to true for the consumers to fetch offset upon starting up
this.subscribedPattern = null;
this.metrics = metrics;
this.subscriptionType = SubscriptionType.NONE;
}

Expand Down Expand Up @@ -156,6 +160,7 @@ public void assignFromUser(Set<TopicPartition> partitions) {
setSubscriptionType(SubscriptionType.USER_ASSIGNED);

if (!this.assignment.partitionSet().equals(partitions)) {
removeAllLagSensors(partitions);
Map<TopicPartition, TopicPartitionState> partitionToState = new HashMap<>();
for (TopicPartition partition : partitions) {
TopicPartitionState state = assignment.stateValue(partition);
Expand All @@ -175,6 +180,8 @@ public void assignFromUser(Set<TopicPartition> partitions) {
public void assignFromSubscribed(Collection<TopicPartition> assignments) {
if (!this.partitionsAutoAssigned())
throw new IllegalArgumentException("Attempt to dynamically assign partitions while manual assignment in use");
Set<TopicPartition> newAssignment = new HashSet<>(assignments);
removeAllLagSensors(newAssignment);

for (TopicPartition tp : assignments)
if (!this.subscription.contains(tp.topic()))
Expand All @@ -185,6 +192,13 @@ public void assignFromSubscribed(Collection<TopicPartition> assignments) {
this.needsFetchCommittedOffsets = true;
}

private void removeAllLagSensors(Set<TopicPartition> preservedPartitions) {
for (TopicPartition tp : assignment.partitionSet()) {
if (!preservedPartitions.contains(tp))
metrics.removeSensor(tp + ".records-lag");
}
}

private Map<TopicPartition, TopicPartitionState> partitionToStateMap(Collection<TopicPartition> assignments) {
Map<TopicPartition, TopicPartitionState> map = new HashMap<>(assignments.size());
for (TopicPartition tp : assignments)
Expand Down Expand Up @@ -305,6 +319,15 @@ public Long position(TopicPartition tp) {
return assignedState(tp).position;
}

public Long partitionLag(TopicPartition tp) {
TopicPartitionState topicPartitionState = assignedState(tp);
return topicPartitionState.highWatermark == null ? null : topicPartitionState.highWatermark - topicPartitionState.position;
}

public void updateHighWatermark(TopicPartition tp, long highWatermark) {
assignedState(tp).highWatermark = highWatermark;
}

public Map<TopicPartition, OffsetAndMetadata> allConsumed() {
Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
for (PartitionStates.PartitionState<TopicPartitionState> state : assignment.partitionStates()) {
Expand Down Expand Up @@ -380,13 +403,15 @@ public ConsumerRebalanceListener listener() {

private static class TopicPartitionState {
private Long position; // last consumed position
private Long highWatermark; // the high watermark from last fetch
private OffsetAndMetadata committed; // last committed position
private boolean paused; // whether this partition has been paused by the user
private OffsetResetStrategy resetStrategy; // the strategy to use if the offset needs resetting

public TopicPartitionState() {
this.paused = false;
this.position = null;
this.highWatermark = null;
this.committed = null;
this.resetStrategy = null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common.metrics.stats;

import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.MetricConfig;

/**
* An instantaneous value.
*/
public class Value implements MeasurableStat {
private double value = 0;

@Override
public double measure(MetricConfig config, long now) {
return value;
}

@Override
public void record(MetricConfig config, double value, long timeMs) {
this.value = value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public class KafkaConsumerTest {

private final String topic3 = "test3";
private final TopicPartition t3p0 = new TopicPartition(topic3, 0);

@Rule
public ExpectedException expectedException = ExpectedException.none();

Expand Down Expand Up @@ -692,7 +692,7 @@ public void testWakeupWithFetchDataAvailable() {
ConsumerRecords<String, String> records = consumer.poll(0);
assertEquals(5, records.count());
}

@Test
public void testPollThrowsInterruptExceptionIfInterrupted() throws Exception {
int rebalanceTimeoutMs = 60000;
Expand All @@ -712,7 +712,7 @@ public void testPollThrowsInterruptExceptionIfInterrupted() throws Exception {

final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, false, 0);

consumer.subscribe(Arrays.asList(topic), getConsumerRebalanceListener(consumer));
prepareRebalance(client, node, assignor, Arrays.asList(tp0), null);

Expand Down Expand Up @@ -1434,7 +1434,7 @@ private KafkaConsumer<String, String> newConsumer(Time time,
ConsumerInterceptors<String, String> interceptors = null;

Metrics metrics = new Metrics();
SubscriptionState subscriptions = new SubscriptionState(autoResetStrategy);
SubscriptionState subscriptions = new SubscriptionState(autoResetStrategy, metrics);
ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata, time, retryBackoffMs, requestTimeoutMs);
ConsumerCoordinator consumerCoordinator = new ConsumerCoordinator(
consumerClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public class ConsumerCoordinatorTest {
public void setup() {
this.time = new MockTime();
this.client = new MockClient(time);
this.subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
this.subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST, metrics);
this.metadata = new Metadata(0, Long.MAX_VALUE);
this.metadata.update(cluster, time.milliseconds());
this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000);
Expand Down Expand Up @@ -279,7 +279,7 @@ public void testCoordinatorDisconnect() {
}

@Test(expected = ApiException.class)
public void testJoinGroupInvalidGroupId() {
public void testJoinGroupInvalidGroupId() {
final String consumerId = "leader";

subscriptions.subscribe(singleton(topicName), rebalanceListener);
Expand Down Expand Up @@ -619,7 +619,7 @@ public boolean matches(AbstractRequest body) {
}

@Test
public void testMetadataChangeTriggersRebalance() {
public void testMetadataChangeTriggersRebalance() {
final String consumerId = "consumer";

// ensure metadata is up-to-date for leader
Expand Down Expand Up @@ -703,7 +703,7 @@ public boolean matches(AbstractRequest body) {


@Test
public void testExcludeInternalTopicsConfigOption() {
public void testExcludeInternalTopicsConfigOption() {
subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);

metadata.update(TestUtils.singletonCluster(TestUtils.GROUP_METADATA_TOPIC_NAME, 2), time.milliseconds());
Expand All @@ -720,7 +720,7 @@ public void testIncludeInternalTopicsConfigOption() {

assertTrue(subscriptions.subscription().contains(TestUtils.GROUP_METADATA_TOPIC_NAME));
}

@Test
public void testRejoinGroup() {
String otherTopic = "otherTopic";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ public class FetcherTest {
private MockClient client = new MockClient(time, metadata);
private Cluster cluster = TestUtils.singletonCluster(topicName, 1);
private Node node = cluster.nodes().get(0);
private SubscriptionState subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
private SubscriptionState subscriptionsNoAutoReset = new SubscriptionState(OffsetResetStrategy.NONE);
private Metrics metrics = new Metrics(time);
private SubscriptionState subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST, metrics);
private SubscriptionState subscriptionsNoAutoReset = new SubscriptionState(OffsetResetStrategy.NONE, metrics);
private static final double EPSILON = 0.0001;
private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000);

Expand Down Expand Up @@ -657,7 +657,7 @@ public void testFetcherMetrics() {
for (int v = 0; v < 3; v++)
builder.appendWithOffset((long) v, Record.NO_TIMESTAMP, "key".getBytes(), String.format("value-%d", v).getBytes());
fetchRecords(builder.build(), Errors.NONE.code(), 200L, 0);
assertEquals(198, recordsFetchLagMax.value(), EPSILON);
assertEquals(197, recordsFetchLagMax.value(), EPSILON);
}

private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchRecords(MemoryRecords records, short error, long hw, int throttleTime) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.junit.Test;

import java.util.Arrays;
Expand All @@ -36,7 +37,7 @@

public class SubscriptionStateTest {

private final SubscriptionState state = new SubscriptionState(OffsetResetStrategy.EARLIEST);
private final SubscriptionState state = new SubscriptionState(OffsetResetStrategy.EARLIEST, new Metrics());
private final String topic = "test";
private final String topic1 = "test1";
private final TopicPartition tp0 = new TopicPartition(topic, 0);
Expand Down
Loading

0 comments on commit 88fdca2

Please sign in to comment.