Skip to content

Commit

Permalink
PHOENIX-938 Use higher priority queue for index updates to prevent de…
Browse files Browse the repository at this point in the history
…adlock (jyates)
  • Loading branch information
jtaylor-sfdc committed May 24, 2014
1 parent d35aeb0 commit 748b76f
Show file tree
Hide file tree
Showing 7 changed files with 732 additions and 29 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.end2end.index;

import static org.junit.Assert.assertEquals;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.ipc.DelegatingPayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.regionserver.PhoenixIndexRpcSchedulerFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.hbase.index.IndexQosRpcControllerFactory;
import org.apache.phoenix.hbase.index.TableName;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;

/**
* Comprehensive test that ensures we are adding custom index handlers
*/
public class IndexHandlerIT {

public static class CountingIndexClientRpcFactory extends RpcControllerFactory {

private IndexQosRpcControllerFactory delegate;

public CountingIndexClientRpcFactory(Configuration conf) {
super(conf);
this.delegate = new IndexQosRpcControllerFactory(conf);
}

@Override
public PayloadCarryingRpcController newController() {
PayloadCarryingRpcController controller = delegate.newController();
return new CountingIndexClientRpcController(controller);
}

@Override
public PayloadCarryingRpcController newController(CellScanner cellScanner) {
PayloadCarryingRpcController controller = delegate.newController(cellScanner);
return new CountingIndexClientRpcController(controller);
}

@Override
public PayloadCarryingRpcController newController(List<CellScannable> cellIterables) {
PayloadCarryingRpcController controller = delegate.newController(cellIterables);
return new CountingIndexClientRpcController(controller);
}
}

public static class CountingIndexClientRpcController extends
DelegatingPayloadCarryingRpcController {

private static Map<Integer, Integer> priorityCounts = new HashMap<Integer, Integer>();

public CountingIndexClientRpcController(PayloadCarryingRpcController delegate) {
super(delegate);
}

@Override
public void setPriority(int pri) {
Integer count = priorityCounts.get(pri);
if (count == 0) {
count = new Integer(0);
}
count = count.intValue() + 1;
priorityCounts.put(pri, count);

}
}

private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();

private static final byte[] row = Bytes.toBytes("row");
private static final byte[] family = Bytes.toBytes("FAM");
private static final byte[] qual = Bytes.toBytes("qual");
private static final HColumnDescriptor FAM1 = new HColumnDescriptor(family);

@Rule
public TableName TestTable = new TableName();

@BeforeClass
public static void setupCluster() throws Exception {
UTIL.startMiniCluster();
}

@AfterClass
public static void shutdownCluster() throws Exception {
UTIL.shutdownMiniCluster();
}

@Before
public void setup() throws Exception {
HTableDescriptor desc =
new HTableDescriptor(org.apache.hadoop.hbase.TableName.valueOf(TestTable
.getTableNameString()));
desc.addFamily(FAM1);

// create the table
HBaseAdmin admin = UTIL.getHBaseAdmin();
admin.createTable(desc);
}

@After
public void cleanup() throws Exception {
HBaseAdmin admin = UTIL.getHBaseAdmin();
admin.disableTable(TestTable.getTableName());
admin.deleteTable(TestTable.getTableName());
}

@Test
public void testClientWritesWithPriority() throws Exception {
Configuration conf = new Configuration(UTIL.getConfiguration());
// add the keys for our rpc factory
conf.set(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY,
CountingIndexClientRpcFactory.class.getName());
// and set the index table as the current table
conf.setStrings(IndexQosRpcControllerFactory.INDEX_TABLE_NAMES_KEY,
TestTable.getTableNameString());
HTable table = new HTable(conf, TestTable.getTableName());

// do a write to the table
Put p = new Put(row);
p.add(family, qual, new byte[] { 1, 0, 1, 0 });
table.put(p);
table.flushCommits();

// check the counts on the rpc controller
assertEquals("Didn't get the expected number of index priority writes!", (int) 1,
(int) CountingIndexClientRpcController.priorityCounts
.get(PhoenixIndexRpcSchedulerFactory.DEFAULT_INDEX_MIN_PRIORITY));

table.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ipc.CallRunner;
import org.apache.hadoop.hbase.ipc.RpcScheduler;
import org.apache.hadoop.hbase.ipc.RpcServer;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;

/**
* {@link RpcScheduler} that first checks to see if this is an index update before passing off the
* call to the delegate {@link RpcScheduler}.
* <p>
* We reserve the range (200, 250], by default (though it is configurable), for index priority
* writes. Currently, we don't do any prioritization within that range - all index writes are
* treated with the same priority and put into the same queue.
*/
public class PhoenixIndexRpcScheduler implements RpcScheduler {

private LinkedBlockingQueue<CallRunner> indexCallQueue;
private RpcScheduler delegate;
private final int handlerCount;
private volatile boolean running;
private int port;
private final List<Thread> handlers = Lists.newArrayList();
private int minPriority;
private int maxPriority;

public PhoenixIndexRpcScheduler(int indexHandlerCount, Configuration conf,
RpcScheduler delegate, int minPriority, int maxPriority) {
int maxQueueLength =
conf.getInt("ipc.server.max.callqueue.length", indexHandlerCount
* RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
this.minPriority = minPriority;
this.maxPriority = maxPriority;

this.indexCallQueue = new LinkedBlockingQueue<CallRunner>(maxQueueLength);
this.handlerCount = indexHandlerCount;
this.delegate = delegate;
}

@Override
public void init(Context context) {
delegate.init(context);
this.port = context.getListenerAddress().getPort();
}

@Override
public void start() {
delegate.start();
running = true;
startHandlers(handlerCount, indexCallQueue, "PhoenixIndexing.");
}

@Override
public void stop() {
running = false;
for (Thread handler : handlers) {
handler.interrupt();
}
delegate.stop();
}

@Override
public void dispatch(CallRunner callTask) throws InterruptedException, IOException {
RpcServer.Call call = callTask.getCall();
int priority = call.header.getPriority();
if (minPriority <= priority && priority < maxPriority) {
indexCallQueue.put(callTask);
} else {
delegate.dispatch(callTask);
}
}

@Override
public int getGeneralQueueLength() {
// not the best way to calculate, but don't have a better way to hook
// into metrics at the moment
return this.delegate.getGeneralQueueLength() + indexCallQueue.size();
}

@Override
public int getPriorityQueueLength() {
return this.delegate.getPriorityQueueLength();
}

@Override
public int getReplicationQueueLength() {
return this.delegate.getReplicationQueueLength();
}

// ****************************************************
// Below copied from SimpleRpcScheduler for visibility
// *****************************************************
private void startHandlers(int handlerCount, final BlockingQueue<CallRunner> callQueue,
String threadNamePrefix) {
for (int i = 0; i < handlerCount; i++) {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
consumerLoop(callQueue);
}
});
t.setDaemon(true);
t.setName(Strings.nullToEmpty(threadNamePrefix) + "RpcServer.handler=" + i + ",port="
+ port);
t.start();
handlers.add(t);
}
}

private void consumerLoop(BlockingQueue<CallRunner> myQueue) {
boolean interrupted = false;
try {
while (running) {
try {
CallRunner task = myQueue.take();
task.run();
} catch (InterruptedException e) {
interrupted = true;
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}

@VisibleForTesting
public void setIndexCallQueueForTesting(LinkedBlockingQueue<CallRunner> indexQueue) {
this.indexCallQueue = indexQueue;
}
}
Loading

0 comments on commit 748b76f

Please sign in to comment.