Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: make Sensor#add idempotent #4853

Merged
merged 3 commits into from
Apr 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 32 additions & 21 deletions clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

Expand All @@ -40,7 +43,7 @@ public final class Sensor {
private final String name;
private final Sensor[] parents;
private final List<Stat> stats;
private final List<KafkaMetric> metrics;
private final Map<MetricName, KafkaMetric> metrics;
private final MetricConfig config;
private final Time time;
private volatile long lastRecordTime;
Expand Down Expand Up @@ -103,7 +106,7 @@ public boolean shouldRecord(final int configId) {
this.registry = registry;
this.name = Utils.notNull(name);
this.parents = parents == null ? new Sensor[0] : parents;
this.metrics = new ArrayList<>();
this.metrics = new LinkedHashMap<>();
this.stats = new ArrayList<>();
this.config = config;
this.time = time;
Expand Down Expand Up @@ -190,7 +193,7 @@ public void checkQuotas() {
}

public void checkQuotas(long timeMs) {
for (KafkaMetric metric : this.metrics) {
for (KafkaMetric metric : this.metrics.values()) {
MetricConfig config = metric.config();
if (config != null) {
Quota quota = config.quota();
Expand Down Expand Up @@ -228,9 +231,11 @@ public synchronized boolean add(CompoundStat stat, MetricConfig config) {
this.stats.add(Utils.notNull(stat));
Object lock = new Object();
for (NamedMeasurable m : stat.stats()) {
KafkaMetric metric = new KafkaMetric(lock, m.name(), m.stat(), config == null ? this.config : config, time);
this.registry.registerMetric(metric);
this.metrics.add(metric);
final KafkaMetric metric = new KafkaMetric(lock, m.name(), m.stat(), config == null ? this.config : config, time);
if (!metrics.containsKey(metric.metricName())) {
registry.registerMetric(metric);
metrics.put(metric.metricName(), metric);
}
}
return true;
}
Expand All @@ -247,24 +252,30 @@ public boolean add(MetricName metricName, MeasurableStat stat) {

/**
* Register a metric with this sensor
*
* @param metricName The name of the metric
* @param stat The statistic to keep
* @param config A special configuration for this metric. If null use the sensor default configuration.
* @param stat The statistic to keep
* @param config A special configuration for this metric. If null use the sensor default configuration.
* @return true if metric is added to sensor, false if sensor is expired
*/
public synchronized boolean add(MetricName metricName, MeasurableStat stat, MetricConfig config) {
if (hasExpired())
public synchronized boolean add(final MetricName metricName, final MeasurableStat stat, final MetricConfig config) {
if (hasExpired()) {
return false;

KafkaMetric metric = new KafkaMetric(new Object(),
Utils.notNull(metricName),
Utils.notNull(stat),
config == null ? this.config : config,
time);
this.registry.registerMetric(metric);
this.metrics.add(metric);
this.stats.add(stat);
return true;
} else if (metrics.containsKey(metricName)) {
return true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we log here indicating an add function called for the same metric name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This behavior is similar to the idempotent sensor creation at https://github.com/vvcephei/kafka/blob/1578db793d54482adaa6a90d4ca1fce45afb929a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java#L399

Interestingly, the choice there was to log only when the sensor got created, not when it was a no-op.

I'm open to whatever.

} else {
final KafkaMetric metric = new KafkaMetric(
new Object(),
Utils.notNull(metricName),
Utils.notNull(stat),
config == null ? this.config : config,
time
);
registry.registerMetric(metric);
metrics.put(metric.metricName(), metric);
stats.add(stat);
return true;
}
}

/**
Expand All @@ -276,6 +287,6 @@ public boolean hasExpired() {
}

synchronized List<KafkaMetric> metrics() {
return Collections.unmodifiableList(this.metrics);
return Collections.unmodifiableList(new LinkedList<>(this.metrics.values()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,25 @@
*/
package org.apache.kafka.common.metrics;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Meter;
import org.apache.kafka.common.metrics.stats.Sum;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.junit.Test;

import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

public class SensorTest {
@Test
public void testRecordLevelEnum() {
Expand Down Expand Up @@ -94,4 +96,32 @@ public void testExpiredSensor() {

metrics.close();
}

@Test
public void testIdempotentAdd() {
final Metrics metrics = new Metrics();
final Sensor sensor = metrics.sensor("sensor");

assertTrue(sensor.add(metrics.metricName("test-metric", "test-group"), new Avg()));

// adding the same metric to the same sensor is a no-op
assertTrue(sensor.add(metrics.metricName("test-metric", "test-group"), new Avg()));


// but adding the same metric to a DIFFERENT sensor is an error
final Sensor anotherSensor = metrics.sensor("another-sensor");
try {
anotherSensor.add(metrics.metricName("test-metric", "test-group"), new Avg());
fail("should have thrown");
} catch (final IllegalArgumentException ignored) {
// pass
}

// note that adding a different metric with the same name is also a no-op
assertTrue(sensor.add(metrics.metricName("test-metric", "test-group"), new Sum()));

// so after all this, we still just have the original metric registered
assertEquals(1, sensor.metrics().size());
assertEquals(org.apache.kafka.common.metrics.stats.Avg.class, sensor.metrics().get(0).measurable().getClass());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroupId;
import org.apache.kafka.common.utils.MockTime;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

Expand Down Expand Up @@ -145,21 +144,21 @@ public void testMetricGroupIdWithoutTags() {

@Test
public void testRecreateWithClose() {
int numMetrics = addToGroup(metrics, false);
int numMetricsInRecreatedGroup = addToGroup(metrics, true);
Assert.assertEquals(numMetrics, numMetricsInRecreatedGroup);
final Sensor originalSensor = addToGroup(metrics, false);
final Sensor recreatedSensor = addToGroup(metrics, true);
// because we closed the metricGroup, we get a brand-new sensor
assertNotSame(originalSensor, recreatedSensor);
}

@Test(expected = IllegalArgumentException.class)
@Test
public void testRecreateWithoutClose() {
int numMetrics = addToGroup(metrics, false);
int numMetricsInRecreatedGroup = addToGroup(metrics, false);
// we should never get here
throw new RuntimeException("Created " + numMetricsInRecreatedGroup
+ " metrics in recreated group. Original=" + numMetrics);
final Sensor originalSensor = addToGroup(metrics, false);
final Sensor recreatedSensor = addToGroup(metrics, false);
// since we didn't close the group, the second addToGroup is idempotent
assertSame(originalSensor, recreatedSensor);
}

private int addToGroup(ConnectMetrics connectMetrics, boolean shouldClose) {
private Sensor addToGroup(ConnectMetrics connectMetrics, boolean shouldClose) {
ConnectMetricsRegistry registry = connectMetrics.registry();
ConnectMetrics.MetricGroup metricGroup = connectMetrics.group(registry.taskGroupName(),
registry.connectorTagName(), "conn_name");
Expand All @@ -172,7 +171,7 @@ private int addToGroup(ConnectMetrics connectMetrics, boolean shouldClose) {
sensor.add(metricName("x1"), new Max());
sensor.add(metricName("y2"), new Avg());

return metricGroup.metrics().metrics().size();
return sensor;
}

static MetricName metricName(String name) {
Expand Down