Skip to content

Commit

Permalink
HIVE-25892: Group HMSHandler's thread locals into a single context
Browse files Browse the repository at this point in the history
  • Loading branch information
dengzhhu653 committed Jan 24, 2022
1 parent 5214369 commit 80494d8
Show file tree
Hide file tree
Showing 2 changed files with 172 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -169,65 +169,25 @@ public class HMSHandler extends FacebookBase implements IHMSHandler {
private Warehouse wh; // hdfs warehouse
private static Striped<Lock> tablelocks;

private static final ThreadLocal<RawStore> threadLocalMS = new ThreadLocal<RawStore>();
private static final ThreadLocal<TxnStore> threadLocalTxn = new ThreadLocal<TxnStore>();

private static final ThreadLocal<Map<String, com.codahale.metrics.Timer.Context>> timerContexts =
new ThreadLocal<Map<String, com.codahale.metrics.Timer.Context>>() {
@Override
protected Map<String, com.codahale.metrics.Timer.Context> initialValue() {
return new HashMap<>();
}
};

public static RawStore getRawStore() {
return threadLocalMS.get();
return HMSHandlerContext.getRawStore().orElse(null);
}

static void cleanupRawStore() {
try {
RawStore rs = getRawStore();
if (rs != null) {
HMSHandlerContext.getRawStore().ifPresent(rs -> {
logAndAudit("Cleaning up thread local RawStore...");
rs.shutdown();
}
});
} finally {
HMSHandler handler = threadLocalHMSHandler.get();
if (handler != null) {
HMSHandlerContext.getHMSHandler().ifPresent(handler -> {
handler.notifyMetaListenersOnShutDown();
}
threadLocalHMSHandler.remove();
threadLocalConf.remove();
threadLocalModifiedConfig.remove();
removeRawStore();
});
HMSHandlerContext.clear();
logAndAudit("Done cleaning up thread local RawStore");
}
}

static void removeRawStore() {
threadLocalMS.remove();
}

// Thread local configuration is needed as many threads could make changes
// to the conf using the connection hook
private static final ThreadLocal<Configuration> threadLocalConf = new ThreadLocal<Configuration>();

/**
* Thread local HMSHandler used during shutdown to notify meta listeners
*/
private static final ThreadLocal<HMSHandler> threadLocalHMSHandler = new ThreadLocal<>();

/**
* Thread local Map to keep track of modified meta conf keys
*/
private static final ThreadLocal<Map<String, String>> threadLocalModifiedConfig =
new ThreadLocal<Map<String, String>>() {
@Override
protected Map<String, String> initialValue() {
return new HashMap<>();
}
};

private static ExecutorService threadPool;

static final Logger auditLog = LoggerFactory.getLogger(
Expand Down Expand Up @@ -265,19 +225,6 @@ public static String getIPAddress() {
return null;
}

private static AtomicInteger nextSerialNum = new AtomicInteger();
private static ThreadLocal<Integer> threadLocalId = new ThreadLocal<Integer>() {
@Override
protected Integer initialValue() {
return nextSerialNum.getAndIncrement();
}
};

// This will only be set if the metastore is being accessed from a metastore Thrift server,
// not if it is from the CLI. Also, only if the TTransport being used to connect is an
// instance of TSocket. This is also not set when kerberos is used.
private static ThreadLocal<String> threadLocalIpAddress = new ThreadLocal<String>();

/**
* Internal function to notify listeners for meta config change events
*/
Expand All @@ -301,16 +248,10 @@ private void notifyMetaListeners(String key, String oldValue, String newValue) t
* that were modified during setMetaConf. This would get called from HiveMetaStore#cleanupRawStore
*/
private void notifyMetaListenersOnShutDown() {
Map<String, String> modifiedConf = threadLocalModifiedConfig.get();
if (modifiedConf == null) {
// Nothing got modified
return;
}
try {
Configuration conf = threadLocalConf.get();
if (conf == null) {
throw new MetaException("Unexpected: modifiedConf is non-null but conf is null");
}
Map<String, String> modifiedConf = HMSHandlerContext.getModifiedConfig();
Configuration conf = HMSHandlerContext.getConfiguration()
.orElseThrow(() -> new MetaException("Unexpected: modifiedConf is non-null but conf is null"));
// Notify listeners of the changed value
for (Map.Entry<String, String> entry : modifiedConf.entrySet()) {
String key = entry.getKey();
Expand All @@ -328,14 +269,14 @@ private void notifyMetaListenersOnShutDown() {
}

static void setThreadLocalIpAddress(String ipAddress) {
threadLocalIpAddress.set(ipAddress);
HMSHandlerContext.setIpAddress(ipAddress);
}

// This will return null if the metastore is not being accessed from a metastore Thrift server,
// or if the TTransport being used to connect is not an instance of TSocket, or if kereberos
// is used
static String getThreadLocalIpAddress() {
return threadLocalIpAddress.get();
return HMSHandlerContext.getIpAddress().orElse(null);
}

// Make it possible for tests to check that the right type of PartitionExpressionProxy was
Expand All @@ -351,12 +292,12 @@ PartitionExpressionProxy getExpressionProxy() {
*/
@Deprecated
public static Integer get() {
return threadLocalId.get();
return HMSHandlerContext.getThreadId();
}

@Override
public int getThreadId() {
return threadLocalId.get();
return HMSHandlerContext.getThreadId();
}

public HMSHandler(String name) throws MetaException {
Expand Down Expand Up @@ -608,34 +549,21 @@ private void authorizeTableForPartitionMetadata(
isServerFilterEnabled, filterHook, catName, dbName, tblName);
}

private static String addPrefix(String s) {
return threadLocalId.get() + ": " + s;
}

/**
* Set copy of invoking HMSHandler on thread local
*/
private static void setHMSHandler(HMSHandler handler) {
if (threadLocalHMSHandler.get() == null) {
threadLocalHMSHandler.set(handler);
}
}
@Override
public void setConf(Configuration conf) {
threadLocalConf.set(conf);
RawStore ms = threadLocalMS.get();
if (ms != null) {
ms.setConf(conf); // reload if DS related configuration is changed
}
HMSHandlerContext.setConfiguration(conf);
// reload if DS related configuration is changed
HMSHandlerContext.getRawStore().ifPresent(ms -> ms.setConf(conf));
}

@Override
public Configuration getConf() {
Configuration conf = threadLocalConf.get();
if (conf == null) {
conf = new Configuration(this.conf);
threadLocalConf.set(conf);
}
Configuration conf = HMSHandlerContext.getConfiguration()
.orElseGet(() -> {
Configuration configuration = new Configuration(this.conf);
HMSHandlerContext.setConfiguration(configuration);
return configuration;
});
return conf;
}

Expand All @@ -659,13 +587,15 @@ public void setMetaConf(String key, String value) throws MetaException {
Configuration configuration = getConf();
String oldValue = MetastoreConf.get(configuration, key);
// Save prev val of the key on threadLocal
Map<String, String> modifiedConf = threadLocalModifiedConfig.get();
Map<String, String> modifiedConf = HMSHandlerContext.getModifiedConfig();
if (!modifiedConf.containsKey(key)) {
modifiedConf.put(key, oldValue);
}
// Set invoking HMSHandler on threadLocal, this will be used later to notify
// metaListeners in HiveMetaStore#cleanupRawStore
setHMSHandler(this);
if (!HMSHandlerContext.getHMSHandler().isPresent()) {
HMSHandlerContext.setHMSHandler(this);
}
configuration.set(key, value);
notifyMetaListeners(key, oldValue, value);

Expand Down Expand Up @@ -696,7 +626,7 @@ public RawStore getMS() throws MetaException {
}

public static RawStore getMSForConf(Configuration conf) throws MetaException {
RawStore ms = threadLocalMS.get();
RawStore ms = getRawStore();
if (ms == null) {
ms = newRawStoreForConf(conf);
try {
Expand All @@ -705,32 +635,22 @@ public static RawStore getMSForConf(Configuration conf) throws MetaException {
ms.shutdown();
throw e;
}
threadLocalMS.set(ms);
ms = threadLocalMS.get();
LOG.info("Created RawStore: " + ms + " from thread id: " + Thread.currentThread().getId());
HMSHandlerContext.setRawStore(ms);
LOG.info("Created RawStore: " + ms + " from thread id: " + HMSHandlerContext.getThreadId());
}
return ms;
}

@Override
public TxnStore getTxnHandler() {
return getMsThreadTxnHandler(conf);
}

public static TxnStore getMsThreadTxnHandler(Configuration conf) {
TxnStore txn = threadLocalTxn.get();
if (txn == null) {
txn = TxnUtils.getTxnStore(conf);
threadLocalTxn.set(txn);
}
return txn;
return HMSHandlerContext.getTxnStore(conf);
}

static RawStore newRawStoreForConf(Configuration conf) throws MetaException {
Configuration newConf = new Configuration(conf);
String rawStoreClassName = MetastoreConf.getVar(newConf, ConfVars.RAW_STORE_IMPL);
LOG.info(addPrefix("Opening raw store with implementation class:" + rawStoreClassName));
return RawStoreProxy.getProxy(newConf, conf, rawStoreClassName, threadLocalId.get());
LOG.info("{}: Opening raw store with implementation class: {}", HMSHandlerContext.getThreadId(), rawStoreClassName);
return RawStoreProxy.getProxy(newConf, conf, rawStoreClassName, HMSHandlerContext.getThreadId());
}

@VisibleForTesting
Expand Down Expand Up @@ -916,7 +836,7 @@ private void addAdminUsers_core() throws MetaException {
}

private static void logAndAudit(final String m) {
LOG.debug("{}: {}", threadLocalId.get(), m);
LOG.debug("{}: {}", HMSHandlerContext.getThreadId(), m);
logAuditEvent(m);
}

Expand All @@ -928,7 +848,7 @@ private String startFunction(String function, String extraLogInfo) {
Metrics.getOrCreateTimer(MetricsConstants.API_PREFIX + function);
if (timer != null) {
// Timer will be null we aren't using the metrics
timerContexts.get().put(function, timer.time());
HMSHandlerContext.getTimerContexts().put(function, timer.time());
}
Counter counter = Metrics.getOrCreateCounter(MetricsConstants.ACTIVE_CALLS + function);
if (counter != null) {
Expand Down Expand Up @@ -972,7 +892,7 @@ private void endFunction(String function, boolean successful, Exception e,
}

private void endFunction(String function, MetaStoreEndFunctionContext context) {
com.codahale.metrics.Timer.Context timerContext = timerContexts.get().remove(function);
com.codahale.metrics.Timer.Context timerContext = HMSHandlerContext.getTimerContexts().remove(function);
if (timerContext != null) {
long timeTaken = timerContext.stop();
LOG.debug((getThreadLocalIpAddress() == null ? "" : "source:" + getThreadLocalIpAddress() + " ") +
Expand Down Expand Up @@ -6504,7 +6424,7 @@ public String get_config_value(String name, String defaultValue)
toReturn = defaultValue;
}
} catch (RuntimeException e) {
LOG.error(threadLocalId.get().toString() + ": "
LOG.error(HMSHandlerContext.getThreadId() + ": "
+ "RuntimeException thrown in get_config_value - msg: "
+ e.getMessage() + " cause: " + e.getCause());
}
Expand Down
Loading

0 comments on commit 80494d8

Please sign in to comment.