diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java index d186c4fccc99..b387cd5508fc 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java @@ -169,65 +169,25 @@ public class HMSHandler extends FacebookBase implements IHMSHandler { private Warehouse wh; // hdfs warehouse private static Striped tablelocks; - private static final ThreadLocal threadLocalMS = new ThreadLocal(); - private static final ThreadLocal threadLocalTxn = new ThreadLocal(); - - private static final ThreadLocal> timerContexts = - new ThreadLocal>() { - @Override - protected Map initialValue() { - return new HashMap<>(); - } - }; - public static RawStore getRawStore() { - return threadLocalMS.get(); + return HMSHandlerContext.getRawStore().orElse(null); } - static void cleanupRawStore() { + static void cleanupHandlerContext() { try { - RawStore rs = getRawStore(); - if (rs != null) { + HMSHandlerContext.getRawStore().ifPresent(rs -> { logAndAudit("Cleaning up thread local RawStore..."); rs.shutdown(); - } + }); } finally { - HMSHandler handler = threadLocalHMSHandler.get(); - if (handler != null) { + HMSHandlerContext.getHMSHandler().ifPresent(handler -> { handler.notifyMetaListenersOnShutDown(); - } - threadLocalHMSHandler.remove(); - threadLocalConf.remove(); - threadLocalModifiedConfig.remove(); - removeRawStore(); + }); + HMSHandlerContext.clear(); logAndAudit("Done cleaning up thread local RawStore"); } } - static void removeRawStore() { - threadLocalMS.remove(); - } - - // Thread local configuration is needed as many threads could make changes - // to the conf using the connection hook - private static final ThreadLocal threadLocalConf = new ThreadLocal(); - - /** - * Thread local HMSHandler used during shutdown to notify meta listeners - */ - private static final ThreadLocal threadLocalHMSHandler = new ThreadLocal<>(); - - /** - * Thread local Map to keep track of modified meta conf keys - */ - private static final ThreadLocal> threadLocalModifiedConfig = - new ThreadLocal>() { - @Override - protected Map initialValue() { - return new HashMap<>(); - } - }; - private static ExecutorService threadPool; static final Logger auditLog = LoggerFactory.getLogger( @@ -265,19 +225,6 @@ public static String getIPAddress() { return null; } - private static AtomicInteger nextSerialNum = new AtomicInteger(); - private static ThreadLocal threadLocalId = new ThreadLocal() { - @Override - protected Integer initialValue() { - return nextSerialNum.getAndIncrement(); - } - }; - - // This will only be set if the metastore is being accessed from a metastore Thrift server, - // not if it is from the CLI. Also, only if the TTransport being used to connect is an - // instance of TSocket. This is also not set when kerberos is used. - private static ThreadLocal threadLocalIpAddress = new ThreadLocal(); - /** * Internal function to notify listeners for meta config change events */ @@ -298,19 +245,13 @@ private void notifyMetaListeners(String key, String oldValue, String newValue) t /** * Internal function to notify listeners to revert back to old values of keys - * that were modified during setMetaConf. This would get called from HiveMetaStore#cleanupRawStore + * that were modified during setMetaConf. This would get called from HiveMetaStore#cleanupHandlerContext */ private void notifyMetaListenersOnShutDown() { - Map modifiedConf = threadLocalModifiedConfig.get(); - if (modifiedConf == null) { - // Nothing got modified - return; - } try { - Configuration conf = threadLocalConf.get(); - if (conf == null) { - throw new MetaException("Unexpected: modifiedConf is non-null but conf is null"); - } + Map modifiedConf = HMSHandlerContext.getModifiedConfig(); + Configuration conf = HMSHandlerContext.getConfiguration() + .orElseThrow(() -> new MetaException("Unexpected: modifiedConf is non-null but conf is null")); // Notify listeners of the changed value for (Map.Entry entry : modifiedConf.entrySet()) { String key = entry.getKey(); @@ -328,14 +269,14 @@ private void notifyMetaListenersOnShutDown() { } static void setThreadLocalIpAddress(String ipAddress) { - threadLocalIpAddress.set(ipAddress); + HMSHandlerContext.setIpAddress(ipAddress); } // This will return null if the metastore is not being accessed from a metastore Thrift server, // or if the TTransport being used to connect is not an instance of TSocket, or if kereberos // is used static String getThreadLocalIpAddress() { - return threadLocalIpAddress.get(); + return HMSHandlerContext.getIpAddress().orElse(null); } // Make it possible for tests to check that the right type of PartitionExpressionProxy was @@ -351,12 +292,12 @@ PartitionExpressionProxy getExpressionProxy() { */ @Deprecated public static Integer get() { - return threadLocalId.get(); + return HMSHandlerContext.getThreadId(); } @Override public int getThreadId() { - return threadLocalId.get(); + return HMSHandlerContext.getThreadId(); } public HMSHandler(String name) throws MetaException { @@ -608,34 +549,21 @@ private void authorizeTableForPartitionMetadata( isServerFilterEnabled, filterHook, catName, dbName, tblName); } - private static String addPrefix(String s) { - return threadLocalId.get() + ": " + s; - } - - /** - * Set copy of invoking HMSHandler on thread local - */ - private static void setHMSHandler(HMSHandler handler) { - if (threadLocalHMSHandler.get() == null) { - threadLocalHMSHandler.set(handler); - } - } @Override public void setConf(Configuration conf) { - threadLocalConf.set(conf); - RawStore ms = threadLocalMS.get(); - if (ms != null) { - ms.setConf(conf); // reload if DS related configuration is changed - } + HMSHandlerContext.setConfiguration(conf); + // reload if DS related configuration is changed + HMSHandlerContext.getRawStore().ifPresent(ms -> ms.setConf(conf)); } @Override public Configuration getConf() { - Configuration conf = threadLocalConf.get(); - if (conf == null) { - conf = new Configuration(this.conf); - threadLocalConf.set(conf); - } + Configuration conf = HMSHandlerContext.getConfiguration() + .orElseGet(() -> { + Configuration configuration = new Configuration(this.conf); + HMSHandlerContext.setConfiguration(configuration); + return configuration; + }); return conf; } @@ -659,13 +587,15 @@ public void setMetaConf(String key, String value) throws MetaException { Configuration configuration = getConf(); String oldValue = MetastoreConf.get(configuration, key); // Save prev val of the key on threadLocal - Map modifiedConf = threadLocalModifiedConfig.get(); + Map modifiedConf = HMSHandlerContext.getModifiedConfig(); if (!modifiedConf.containsKey(key)) { modifiedConf.put(key, oldValue); } // Set invoking HMSHandler on threadLocal, this will be used later to notify - // metaListeners in HiveMetaStore#cleanupRawStore - setHMSHandler(this); + // metaListeners in HiveMetaStore#cleanupHandlerContext + if (!HMSHandlerContext.getHMSHandler().isPresent()) { + HMSHandlerContext.setHMSHandler(this); + } configuration.set(key, value); notifyMetaListeners(key, oldValue, value); @@ -696,7 +626,7 @@ public RawStore getMS() throws MetaException { } public static RawStore getMSForConf(Configuration conf) throws MetaException { - RawStore ms = threadLocalMS.get(); + RawStore ms = getRawStore(); if (ms == null) { ms = newRawStoreForConf(conf); try { @@ -705,32 +635,22 @@ public static RawStore getMSForConf(Configuration conf) throws MetaException { ms.shutdown(); throw e; } - threadLocalMS.set(ms); - ms = threadLocalMS.get(); - LOG.info("Created RawStore: " + ms + " from thread id: " + Thread.currentThread().getId()); + HMSHandlerContext.setRawStore(ms); + LOG.info("Created RawStore: {}", ms); } return ms; } @Override public TxnStore getTxnHandler() { - return getMsThreadTxnHandler(conf); - } - - public static TxnStore getMsThreadTxnHandler(Configuration conf) { - TxnStore txn = threadLocalTxn.get(); - if (txn == null) { - txn = TxnUtils.getTxnStore(conf); - threadLocalTxn.set(txn); - } - return txn; + return HMSHandlerContext.getTxnStore(conf); } static RawStore newRawStoreForConf(Configuration conf) throws MetaException { Configuration newConf = new Configuration(conf); String rawStoreClassName = MetastoreConf.getVar(newConf, ConfVars.RAW_STORE_IMPL); - LOG.info(addPrefix("Opening raw store with implementation class:" + rawStoreClassName)); - return RawStoreProxy.getProxy(newConf, conf, rawStoreClassName, threadLocalId.get()); + LOG.info("Opening raw store with implementation class: {}", rawStoreClassName); + return RawStoreProxy.getProxy(newConf, conf, rawStoreClassName, HMSHandlerContext.getThreadId()); } @VisibleForTesting @@ -916,7 +836,7 @@ private void addAdminUsers_core() throws MetaException { } private static void logAndAudit(final String m) { - LOG.debug("{}: {}", threadLocalId.get(), m); + LOG.debug(m); logAuditEvent(m); } @@ -928,7 +848,7 @@ private String startFunction(String function, String extraLogInfo) { Metrics.getOrCreateTimer(MetricsConstants.API_PREFIX + function); if (timer != null) { // Timer will be null we aren't using the metrics - timerContexts.get().put(function, timer.time()); + HMSHandlerContext.getTimerContexts().put(function, timer.time()); } Counter counter = Metrics.getOrCreateCounter(MetricsConstants.ACTIVE_CALLS + function); if (counter != null) { @@ -972,7 +892,7 @@ private void endFunction(String function, boolean successful, Exception e, } private void endFunction(String function, MetaStoreEndFunctionContext context) { - com.codahale.metrics.Timer.Context timerContext = timerContexts.get().remove(function); + com.codahale.metrics.Timer.Context timerContext = HMSHandlerContext.getTimerContexts().remove(function); if (timerContext != null) { long timeTaken = timerContext.stop(); LOG.debug((getThreadLocalIpAddress() == null ? "" : "source:" + getThreadLocalIpAddress() + " ") + @@ -995,7 +915,7 @@ public fb_status getStatus() { @Override public void shutdown() { - cleanupRawStore(); + cleanupHandlerContext(); PerfLogger.getPerfLogger(false).cleanupPerfLogMetrics(); } @@ -6504,8 +6424,7 @@ public String get_config_value(String name, String defaultValue) toReturn = defaultValue; } } catch (RuntimeException e) { - LOG.error(threadLocalId.get().toString() + ": " - + "RuntimeException thrown in get_config_value - msg: " + LOG.error("RuntimeException thrown in get_config_value - msg: " + e.getMessage() + " cause: " + e.getCause()); } success = true; diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandlerContext.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandlerContext.java new file mode 100644 index 000000000000..f46c91749436 --- /dev/null +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandlerContext.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; + +/** + * When one hms client connects in, we create a handler context for it. + * We store session information here. + */ +public final class HMSHandlerContext { + + private static final ThreadLocal context = ThreadLocal.withInitial(() -> new HMSHandlerContext()); + + private static final AtomicInteger nextSerialNum = new AtomicInteger(); + + private RawStore rawStore; + + private TxnStore txnStore; + + // Thread local HMSHandler used during shutdown to notify meta listeners + private HMSHandler hmsHandler; + + // Thread local configuration is needed as many threads could make changes + // to the conf using the connection hook + private Configuration configuration; + + // Thread local Map to keep track of modified meta conf keys + private Map modifiedConfig = new HashMap<>(); + + private Integer threadId = nextSerialNum.incrementAndGet(); + // This will only be set if the metastore is being accessed from a metastore Thrift server, + // not if it is from the CLI. Also, only if the TTransport being used to connect is an + // instance of TSocket. This is also not set when kerberos is used. + private String ipAddress; + + private Map timerContexts = new HashMap<>(); + + private HMSHandlerContext() { + } + + public static Optional getRawStore() { + return Optional.ofNullable(context.get().rawStore); + } + + public static Optional getHMSHandler() { + return Optional.ofNullable(context.get().hmsHandler); + } + + public static Optional getIpAddress() { + return Optional.ofNullable(context.get().ipAddress); + } + + public static Optional getConfiguration() { + return Optional.ofNullable(context.get().configuration); + } + + public static TxnStore getTxnStore(Configuration conf) { + if (context.get().txnStore == null) { + setTxnStore(TxnUtils.getTxnStore(conf)); + } + return context.get().txnStore; + } + + public static Map getModifiedConfig() { + return context.get().modifiedConfig; + } + + public static Integer getThreadId() { + return context.get().threadId; + } + + public static Map getTimerContexts() { + return context.get().timerContexts; + } + + public static void setRawStore(RawStore rawStore) { + context.get().rawStore = rawStore; + } + + public static void setTxnStore(TxnStore txnStore) { + context.get().txnStore = txnStore; + } + + public static void setHMSHandler(HMSHandler hmsHandler) { + context.get().hmsHandler = hmsHandler; + } + + public static void setConfiguration(Configuration conf) { + context.get().configuration = conf; + } + + public static void setIpAddress(String ipAddress) { + context.get().ipAddress = ipAddress; + } + + /** + * Release the context + */ + public static void clear() { + context.remove(); + } + +} diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 2af557e2edf1..a5845bf9f95e 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -469,7 +469,7 @@ public void deleteContext(ServerContext serverContext, TProtocol tProtocol, TPro Metrics.getOpenConnectionsCounter().dec(); // If the IMetaStoreClient#close was called, HMSHandler#shutdown would have already // cleaned up thread local RawStore. Otherwise, do it now. - HMSHandler.cleanupRawStore(); + HMSHandler.cleanupHandlerContext(); } @Override