Skip to content

Commit

Permalink
[#10531] Apply AsyncConnection of HBase
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Dec 4, 2023
1 parent 3f7fbc7 commit 7b427ea
Show file tree
Hide file tree
Showing 33 changed files with 1,964 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.navercorp.pinpoint.common.server.executor.ExecutorProperties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.security.User;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
Expand All @@ -32,9 +33,9 @@
@org.springframework.context.annotation.Configuration
public class BatchHbaseClientConfiguration {
@Bean
public FactoryBean<Connection> batchConnectionFactory(Configuration configuration,
public FactoryBean<Connection> batchConnectionFactory(Configuration configuration, User user,
@Qualifier("batchHbaseThreadPool") ExecutorService executorService) {
return new ConnectionFactoryBean(configuration, executorService);
return new ConnectionFactoryBean(configuration, user, executorService);
}

@Bean
Expand Down
2 changes: 2 additions & 0 deletions collector/src/main/resources/hbase-root.properties
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ hbase.client.properties.hbase.rpc.timeout=10000
hbase.client.properties.hbase.client.operation.timeout=10000
# hbase default: 20m
hbase.client.properties.hbase.client.meta.operation.timeout=10000
# hbase.client.scanner.max.result.size. hbase default: 2MB
hbase.client.properties.hbase.client.scanner.max.result.size=512000

# hbase socket read timeout. default: 200000
hbase.client.properties.hbase.ipc.client.socket.timeout.read=20000
Expand Down
5 changes: 5 additions & 0 deletions commons-hbase/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-shaded-client</artifactId>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.apache.hbase</groupId>-->
<!-- <artifactId>hbase-client</artifactId>-->
<!-- <version>${hbase.shaded.client.version}</version>-->
<!-- </dependency>-->

<dependency>
<groupId>com.sematext.hbasewd</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.navercorp.pinpoint.common.hbase;

import com.navercorp.pinpoint.common.hbase.async.AdvancedAsyncTableCallback;
import com.navercorp.pinpoint.common.hbase.async.AsyncTableCallback;
import com.navercorp.pinpoint.common.util.CollectionUtils;
import com.sematext.hbase.wd.AbstractRowKeyDistributor;
import org.apache.hadoop.hbase.CompareOperator;
Expand Down Expand Up @@ -302,4 +304,14 @@ public <T> List<T> find(TableName tableName, Scan scan, RowMapper<T> action) {
return delegate.find(tableName, scan, action);
}

@Override
public <T> T executeAsync(TableName tableName, AdvancedAsyncTableCallback<T> action) {
return delegate.executeAsync(tableName, action);
}

@Override
public <T> T executeAsync(TableName tableName, AsyncTableCallback<T> action) {
return delegate.executeAsync(tableName, action);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.security.User;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;

import java.io.IOException;
Expand All @@ -40,8 +40,8 @@
public class ConnectionFactoryBean implements FactoryBean<Connection>, InitializingBean, DisposableBean {

private final Logger logger = LogManager.getLogger(this.getClass());
private final User user;

private HbaseSecurityInterceptor hbaseSecurityInterceptor = new EmptyHbaseSecurityInterceptor();

private HadoopResourceCleanerRegistry cleaner;

Expand All @@ -51,16 +51,15 @@ public class ConnectionFactoryBean implements FactoryBean<Connection>, Initializ

private Consumer<Connection> postProcessor;

public ConnectionFactoryBean(Configuration configuration) {
Objects.requireNonNull(configuration, "configuration");
this.configuration = configuration;
public ConnectionFactoryBean(Configuration configuration, User user) {
this.configuration = Objects.requireNonNull(configuration, "configuration");
this.user = Objects.requireNonNull(user, "user");
}

public ConnectionFactoryBean(Configuration configuration, ExecutorService executorService) {
Objects.requireNonNull(configuration, "configuration");
Objects.requireNonNull(executorService, "executorService");
this.configuration = configuration;
this.executorService = executorService;
public ConnectionFactoryBean(Configuration configuration, User user, ExecutorService executorService) {
this.configuration = Objects.requireNonNull(configuration, "configuration");
this.user = Objects.requireNonNull(user, "user");
this.executorService = Objects.requireNonNull(executorService, "executorService");
}

@Override
Expand All @@ -69,12 +68,11 @@ public void afterPropertiesSet() throws Exception {
this.cleaner.register(configuration);
}

hbaseSecurityInterceptor.process(configuration);
try {
if (executorService == null) {
connection = ConnectionFactory.createConnection(this.configuration);
connection = ConnectionFactory.createConnection(this.configuration, user);
} else {
connection = ConnectionFactory.createConnection(this.configuration, executorService);
connection = ConnectionFactory.createConnection(this.configuration, executorService, user);
}
} catch (IOException e) {
throw new HbaseSystemException(e);
Expand All @@ -85,11 +83,11 @@ public void afterPropertiesSet() throws Exception {
}
}

@Qualifier("hbaseSecurityInterceptor")
@Autowired(required = false)
public void setHbaseSecurityInterceptor(HbaseSecurityInterceptor hbaseSecurityInterceptor) {
this.hbaseSecurityInterceptor = hbaseSecurityInterceptor;
}
// @Qualifier("hbaseSecurityInterceptor")
// @Autowired(required = false)
// public void setHbaseSecurityInterceptor(HbaseSecurityProvider hbaseSecurityProvider) {
// this.hbaseSecurityProvider = hbaseSecurityProvider;
// }

@Autowired(required = false)
public void setCleaner(HadoopResourceCleanerRegistry cleaner) {
Expand All @@ -107,11 +105,8 @@ public Connection getObject() throws Exception {
}

@Override
public Class<?> getObjectType() {
if (connection == null) {
return Connection.class;
}
return connection.getClass();
public Class<Connection> getObjectType() {
return Connection.class;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@
*/
package com.navercorp.pinpoint.common.hbase;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;

import com.navercorp.pinpoint.common.hbase.async.AsyncTableFactory;
import org.apache.hadoop.conf.Configuration;
import org.springframework.util.StringUtils;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;

/**
* Base class for {@link HbaseTemplate} and {@link HbaseInterceptor}, defining commons properties such as {@link HTableInterfaceFactory} and {@link Configuration}.
* Base class for {@link HbaseTemplate2} , defining commons properties such as {@link TableInterfaceFactory} and {@link Configuration}.
*
* Not intended to be used directly.
*
Expand All @@ -34,6 +35,9 @@ public abstract class HbaseAccessor {
private static final Charset CHARSET = StandardCharsets.UTF_8;

private TableFactory tableFactory;

private AsyncTableFactory asyncTableFactory;

private Configuration configuration;

/**
Expand Down Expand Up @@ -74,4 +78,13 @@ public Configuration getConfiguration() {
public Charset getCharset() {
return (StringUtils.hasText(encoding) ? Charset.forName(encoding) : CHARSET);
}


public AsyncTableFactory getAsyncTableFactory() {
return asyncTableFactory;
}

public void setAsyncTableFactory(AsyncTableFactory asyncTableFactory) {
this.asyncTableFactory = asyncTableFactory;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.navercorp.pinpoint.common.hbase;

import com.navercorp.pinpoint.common.hbase.async.AdvancedAsyncTableCallback;
import com.navercorp.pinpoint.common.hbase.async.AsyncTableCallback;
import com.sematext.hbase.wd.AbstractRowKeyDistributor;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.TableName;
Expand Down Expand Up @@ -216,4 +218,8 @@ public interface HbaseOperations2 {
*/
<T> List<T> find(TableName tableName, final Scan scan, final RowMapper<T> action);


<T> T executeAsync(TableName tableName, AdvancedAsyncTableCallback<T> action);

<T> T executeAsync(TableName tableName, AsyncTableCallback<T> action);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
*/
package com.navercorp.pinpoint.common.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.security.User;

/**
* @author minwoo.jung
*/
public interface HbaseSecurityInterceptor {
public interface HbaseSecurityProvider {

void process(Configuration configuration);
User login();
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
@SuppressWarnings("serial")
public class HbaseSystemException extends UncategorizedDataAccessException {

public HbaseSystemException(Throwable cause) {
super(cause.getMessage(), cause);
}

public HbaseSystemException(Exception cause) {
super(cause.getMessage(), cause);
}
Expand Down
Loading

0 comments on commit 7b427ea

Please sign in to comment.