Skip to content

Commit

Permalink
KAFKA-14559: Fix JMX tool to handle the object names with wildcard an…
Browse files Browse the repository at this point in the history
…d optional attributes (#13060)

Reviewers: Federico Valeri <[email protected]>, Satish Duggana <[email protected]>
  • Loading branch information
kamalcph authored May 11, 2023
1 parent bd65db8 commit 54a4067
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 21 deletions.
40 changes: 19 additions & 21 deletions tools/src/main/java/org/apache/kafka/tools/JmxTool.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public static void main(String[] args) {
List<ObjectName> queries = options.queries();
boolean hasPatternQueries = queries.stream().filter(Objects::nonNull).anyMatch(ObjectName::isPattern);

Set<ObjectName> found = findObjectsIfNoPattern(options, conn, queries, hasPatternQueries);
Set<ObjectName> found = findObjects(options, conn, queries, hasPatternQueries);
Map<ObjectName, Integer> numExpectedAttributes =
findNumExpectedAttributes(conn, attributesInclude, hasPatternQueries, queries, found);

Expand Down Expand Up @@ -113,8 +113,8 @@ public static void main(String[] args) {
}
}

private static String mkString(Stream<Object> stream, String delimeter) {
return stream.filter(Objects::nonNull).map(Object::toString).collect(Collectors.joining(delimeter));
private static String mkString(Stream<Object> stream, String delimiter) {
return stream.filter(Objects::nonNull).map(Object::toString).collect(Collectors.joining(delimiter));
}

private static int sumValues(Map<ObjectName, Integer> numExpectedAttributes) {
Expand Down Expand Up @@ -162,26 +162,24 @@ private static MBeanServerConnection connectToBeanServer(JmxToolOptions options)
return serverConn;
}

private static Set<ObjectName> findObjectsIfNoPattern(JmxToolOptions options,
MBeanServerConnection conn,
List<ObjectName> queries,
boolean hasPatternQueries) throws Exception {
private static Set<ObjectName> findObjects(JmxToolOptions options,
MBeanServerConnection conn,
List<ObjectName> queries,
boolean hasPatternQueries) throws Exception {
long waitTimeoutMs = 10_000;
Set<ObjectName> result = new HashSet<>();
Set<ObjectName> querySet = new HashSet<>(queries);
BiPredicate<Set<ObjectName>, Set<ObjectName>> foundAllObjects = (s1, s2) -> s1.containsAll(s2);
if (!hasPatternQueries) {
long start = System.currentTimeMillis();
do {
if (!result.isEmpty()) {
System.err.println("Could not find all object names, retrying");
TimeUnit.MILLISECONDS.sleep(100);
}
result.addAll(queryObjects(conn, queries));
} while (options.hasWait() && System.currentTimeMillis() - start < waitTimeoutMs && !foundAllObjects.test(querySet, result));
}
BiPredicate<Set<ObjectName>, Set<ObjectName>> foundAllObjects = Set::containsAll;
long start = System.currentTimeMillis();
do {
if (!result.isEmpty()) {
System.err.println("Could not find all object names, retrying");
TimeUnit.MILLISECONDS.sleep(100);
}
result.addAll(queryObjects(conn, queries));
} while (!hasPatternQueries && options.hasWait() && System.currentTimeMillis() - start < waitTimeoutMs && !foundAllObjects.test(querySet, result));

if (options.hasWait() && !foundAllObjects.test(querySet, result)) {
if (!hasPatternQueries && options.hasWait() && !foundAllObjects.test(querySet, result)) {
querySet.removeAll(result);
String missing = mkString(querySet.stream().map(Object::toString), ",");
throw new TerseException(String.format("Could not find all requested object names after %d ms. Missing %s", waitTimeoutMs, missing));
Expand Down Expand Up @@ -218,7 +216,7 @@ private static Map<ObjectName, Integer> findNumExpectedAttributes(MBeanServerCon
}
});
} else {
if (!hasPatternQueries) {
if (hasPatternQueries) {
found.forEach(objectName -> {
try {
MBeanInfo mBeanInfo = conn.getMBeanInfo(objectName);
Expand All @@ -237,7 +235,7 @@ private static Map<ObjectName, Integer> findNumExpectedAttributes(MBeanServerCon
}
});
} else {
queries.forEach(objectName -> result.put(objectName, attributesInclude.get().length));
found.forEach(objectName -> result.put(objectName, attributesInclude.get().length));
}
}

Expand Down
134 changes: 134 additions & 0 deletions tools/src/test/java/org/apache/kafka/tools/JmxToolTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,140 @@ public void filteredMetrics() {
assertEquals("3.0", csv.get("kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FiveMinuteRate"));
}

@Test
public void testDomainNamePattern() {
String[] args = new String[]{
"--jmx-url", jmxUrl,
"--object-name", "kafka.serve?:*",
"--attributes", "FifteenMinuteRate,FiveMinuteRate",
"--report-format", "csv",
"--one-time"
};
String out = executeAndGetOut(args);
assertNormalExit();

Map<String, String> csv = parseCsv(out);
assertEquals("1.0", csv.get("kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FifteenMinuteRate"));
assertEquals("3.0", csv.get("kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FiveMinuteRate"));
}

@Test
public void testDomainNamePatternWithNoAttributes() {
String[] args = new String[]{
"--jmx-url", jmxUrl,
"--object-name", "kafka.serve?:*",
"--report-format", "csv",
"--one-time"
};
String out = executeAndGetOut(args);
assertNormalExit();

Map<String, String> csv = parseCsv(out);
assertEquals("1.0", csv.get("kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FifteenMinuteRate"));
assertEquals("3.0", csv.get("kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FiveMinuteRate"));
}

@Test
public void testPropertyListPattern() {
String[] args = new String[]{
"--jmx-url", jmxUrl,
"--object-name", "kafka.server:type=BrokerTopicMetrics,*",
"--attributes", "FifteenMinuteRate,FiveMinuteRate",
"--report-format", "csv",
"--one-time"
};
String out = executeAndGetOut(args);
assertNormalExit();

Map<String, String> csv = parseCsv(out);
assertEquals("1.0", csv.get("kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FifteenMinuteRate"));
assertEquals("3.0", csv.get("kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FiveMinuteRate"));
}

@Test
public void testPropertyListPatternWithNoAttributes() {
String[] args = new String[]{
"--jmx-url", jmxUrl,
"--object-name", "kafka.server:type=BrokerTopicMetrics,*",
"--report-format", "csv",
"--one-time"
};
String out = executeAndGetOut(args);
assertNormalExit();

Map<String, String> csv = parseCsv(out);
assertEquals("1.0", csv.get("kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FifteenMinuteRate"));
assertEquals("3.0", csv.get("kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FiveMinuteRate"));
}

@Test
public void testPropertyValuePattern() {
String[] args = new String[]{
"--jmx-url", jmxUrl,
"--object-name", "kafka.server:type=BrokerTopicMetrics,name=*InPerSec",
"--attributes", "FifteenMinuteRate,FiveMinuteRate",
"--report-format", "csv",
"--one-time"
};
String out = executeAndGetOut(args);
assertNormalExit();

Map<String, String> csv = parseCsv(out);
assertEquals("1.0", csv.get("kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FifteenMinuteRate"));
assertEquals("3.0", csv.get("kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FiveMinuteRate"));
}

@Test
public void testPropertyValuePatternWithNoAttributes() {
String[] args = new String[]{
"--jmx-url", jmxUrl,
"--object-name", "kafka.server:type=BrokerTopicMetrics,name=*InPerSec",
"--report-format", "csv",
"--one-time"
};
String out = executeAndGetOut(args);
assertNormalExit();

Map<String, String> csv = parseCsv(out);
assertEquals("1.0", csv.get("kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FifteenMinuteRate"));
assertEquals("3.0", csv.get("kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FiveMinuteRate"));
}

@Test
// Combination of property-list and property-value patterns
public void testPropertyPattern() {
String[] args = new String[]{
"--jmx-url", jmxUrl,
"--object-name", "kafka.server:type=*,*",
"--attributes", "FifteenMinuteRate,FiveMinuteRate",
"--report-format", "csv",
"--one-time"
};
String out = executeAndGetOut(args);
assertNormalExit();

Map<String, String> csv = parseCsv(out);
assertEquals("1.0", csv.get("kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FifteenMinuteRate"));
assertEquals("3.0", csv.get("kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FiveMinuteRate"));
}

@Test
// Combination of property-list and property-value patterns
public void testPropertyPatternWithNoAttributes() {
String[] args = new String[]{
"--jmx-url", jmxUrl,
"--object-name", "kafka.server:type=*,*",
"--report-format", "csv",
"--one-time"
};
String out = executeAndGetOut(args);
assertNormalExit();

Map<String, String> csv = parseCsv(out);
assertEquals("1.0", csv.get("kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FifteenMinuteRate"));
assertEquals("3.0", csv.get("kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FiveMinuteRate"));
}

@Test
public void dateFormat() {
String dateFormat = "yyyyMMdd-hh:mm:ss";
Expand Down

0 comments on commit 54a4067

Please sign in to comment.