Skip to content

Commit

Permalink
HIVE-25959: Expose Compaction Observability delta metrics using the J…
Browse files Browse the repository at this point in the history
…sonReporter (Viktor Csomor, reviewed by Karen Coppage)

- Custom MapMetrics has been included into the MetricsRegistry.
- The MetricsRegistry will call the listeners for the built-in metrics. Since the MapMetrics is not a built-in one it is causing an IllegalArgument Exception. This exception is handled.
- A new Jackson JSON module has been added (MapCapableJsonMetricsModule) to add serialization of the `mbeans`
- MapCapableJsonMetricsModule has been registered to the JsonReporter
- AcidMetricsService now leveraging both the on the new metrics and the MetricsMBeanImpl+JMX

Closes apache#3032.
  • Loading branch information
vcsomor committed Feb 22, 2022
1 parent f15de94 commit bf09d78
Show file tree
Hide file tree
Showing 12 changed files with 514 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ protected void runAcidMetricService() {
}

protected Table newTable(String dbName, String tableName, boolean partitioned) throws TException {
return newTable(dbName, tableName, partitioned, new HashMap<String, String>(), null, false);
return newTable(dbName, tableName, partitioned, new HashMap<>(), null, false);
}

protected Table newTable(String dbName, String tableName, boolean partitioned,
Expand All @@ -180,7 +180,7 @@ protected Table newTable(String dbName, String tableName, boolean partitioned,
table.setDbName(dbName);
table.setOwner("me");
table.setSd(newStorageDescriptor(getLocation(tableName, null), sortCols));
List<FieldSchema> partKeys = new ArrayList<FieldSchema>(1);
List<FieldSchema> partKeys = new ArrayList<>(1);
if (partitioned) {
partKeys.add(new FieldSchema("ds", "string", "no comment"));
table.setPartitionKeys(partKeys);
Expand Down Expand Up @@ -673,14 +673,16 @@ protected long compactInTxn(CompactionRequest rqst) throws Exception {
return compactorTxnId;
}

protected Map<String, String> gaugeToMap(String metric) throws Exception {

protected static Map<String, Integer> gaugeToMap(String metric) throws Exception {
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
ObjectName oname = new ObjectName(AcidMetricService.OBJECT_NAME_PREFIX + metric);
MBeanInfo mbeanInfo = mbs.getMBeanInfo(oname);

Map<String, String> result = new HashMap<>();
Map<String, Integer> result = new HashMap<>();
for (MBeanAttributeInfo attr : mbeanInfo.getAttributes()) {
result.put(attr.getName(), String.valueOf(mbs.getAttribute(oname, attr.getName())));
Object attribute = mbs.getAttribute(oname, attr.getName());
result.put(attr.getName(), Integer.valueOf(String.valueOf(attribute)));
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@
*/
package org.apache.hadoop.hive.ql.txn.compactor;

import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.junit.Assert;
import org.apache.hadoop.hive.metastore.metrics.Metrics;
import org.hamcrest.CoreMatchers;
import org.junit.Test;

import static org.apache.hadoop.hive.metastore.metrics.MetricsConstants.COMPACTION_NUM_DELTAS;
import static org.apache.hadoop.hive.metastore.metrics.MetricsConstants.COMPACTION_NUM_OBSOLETE_DELTAS;
import static org.apache.hadoop.hive.metastore.metrics.MetricsConstants.COMPACTION_NUM_SMALL_DELTAS;
import static org.hamcrest.MatcherAssert.assertThat;

public class TestDeltaFilesMetricFlags extends CompactorTest {

Expand All @@ -43,7 +46,9 @@ public void testDeltaFilesMetricFromInitiatorWithMetricsDisabled() throws Except
conf.setBoolean(MetastoreConf.ConfVars.METRICS_ENABLED.getVarname(), false);
setup(conf);
startInitiator();
Assert.assertEquals(0, gaugeToMap(COMPACTION_NUM_DELTAS).size());
assertThat(Metrics.getOrCreateMapMetrics(COMPACTION_NUM_DELTAS).get(),
CoreMatchers.is(ImmutableMap.of()));
gaugeToMap(COMPACTION_NUM_DELTAS);
}

@Test(expected = javax.management.InstanceNotFoundException.class)
Expand All @@ -52,7 +57,9 @@ public void testDeltaFilesMetricFromWorkerWithMetricsDisabled() throws Exception
conf.setBoolean(MetastoreConf.ConfVars.METRICS_ENABLED.getVarname(), false);
setup(conf);
startWorker();
Assert.assertEquals(0, gaugeToMap(COMPACTION_NUM_SMALL_DELTAS).size());
assertThat(gaugeToMap(COMPACTION_NUM_SMALL_DELTAS),
CoreMatchers.is(ImmutableMap.of()));
gaugeToMap(COMPACTION_NUM_SMALL_DELTAS);
}

@Test(expected = javax.management.InstanceNotFoundException.class)
Expand All @@ -61,7 +68,9 @@ public void testDeltaFilesMetricFromCleanerWithMetricsDisabled() throws Exceptio
conf.setBoolean(MetastoreConf.ConfVars.METRICS_ENABLED.getVarname(), false);
setup(conf);
startCleaner();
Assert.assertEquals(0, gaugeToMap(COMPACTION_NUM_OBSOLETE_DELTAS).size());
assertThat(gaugeToMap(COMPACTION_NUM_OBSOLETE_DELTAS),
CoreMatchers.is(ImmutableMap.of()));
gaugeToMap(COMPACTION_NUM_OBSOLETE_DELTAS);
}

@Test(expected = javax.management.InstanceNotFoundException.class)
Expand All @@ -70,7 +79,9 @@ public void testDeltaFilesMetricFromInitiatorWithAcidMetricsThreadDisabled() thr
conf.setBoolean(MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_THREAD_ON.getVarname(), false);
setup(conf);
startInitiator();
Assert.assertEquals(0, gaugeToMap(COMPACTION_NUM_DELTAS).size());
assertThat(gaugeToMap(COMPACTION_NUM_DELTAS),
CoreMatchers.is(ImmutableMap.of()));
gaugeToMap(COMPACTION_NUM_DELTAS);
}

@Test(expected = javax.management.InstanceNotFoundException.class)
Expand All @@ -79,7 +90,9 @@ public void testDeltaFilesMetricFromWorkerWithAcidMetricsThreadDisabled() throws
conf.setBoolean(MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_THREAD_ON.getVarname(), false);
setup(conf);
startWorker();
Assert.assertEquals(0, gaugeToMap(COMPACTION_NUM_SMALL_DELTAS).size());
assertThat(gaugeToMap(COMPACTION_NUM_SMALL_DELTAS),
CoreMatchers.is(ImmutableMap.of()));
gaugeToMap(COMPACTION_NUM_SMALL_DELTAS);
}

@Test(expected = javax.management.InstanceNotFoundException.class)
Expand All @@ -88,7 +101,8 @@ public void testDeltaFilesMetricFromCleanerWithAcidMetricsThreadDisabled() throw
conf.setBoolean(MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_THREAD_ON.getVarname(), false);
setup(conf);
startCleaner();
Assert.assertEquals(0, gaugeToMap(COMPACTION_NUM_OBSOLETE_DELTAS).size());
assertThat(gaugeToMap(COMPACTION_NUM_OBSOLETE_DELTAS),
CoreMatchers.is(ImmutableMap.of()));
gaugeToMap(COMPACTION_NUM_OBSOLETE_DELTAS);
}

}
Loading

0 comments on commit bf09d78

Please sign in to comment.