Skip to content

Commit

Permalink
Update SkipRange test based on improved parallelization
Browse files Browse the repository at this point in the history
  • Loading branch information
jtaylor-sfdc committed Oct 20, 2014
1 parent 7835f41 commit 8ff8fed
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.SequenceUtil;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;


Expand All @@ -96,6 +97,7 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
private KeyValueBuilder kvBuilder;
private volatile boolean initialized;
private volatile SQLException initializationException;
private final Map<String, List<HRegionLocation>> tableSplits = Maps.newHashMap();

public ConnectionlessQueryServicesImpl(QueryServices queryServices, ConnectionInfo connInfo) {
super(queryServices);
Expand Down Expand Up @@ -123,6 +125,10 @@ public HTableInterface getTable(byte[] tableName) throws SQLException {

@Override
public List<HRegionLocation> getAllTableRegions(byte[] tableName) throws SQLException {
List<HRegionLocation> regions = tableSplits.get(Bytes.toString(tableName));
if (regions != null) {
return regions;
}
return Collections.singletonList(new HRegionLocation(
new HRegionInfo(TableName.valueOf(tableName), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW),
SERVER_NAME, -1));
Expand Down Expand Up @@ -168,16 +174,49 @@ public MetaDataMutationResult getTable(PName tenantId, byte[] schemaBytes, byte[
} catch (TableNotFoundException e) {
return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, 0, null);
}
//return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, 0, null);
}

private static byte[] getTableName(List<Mutation> tableMetaData, byte[] physicalTableName) {
if (physicalTableName != null) {
return physicalTableName;
}
byte[][] rowKeyMetadata = new byte[3][];
Mutation m = MetaDataUtil.getPutOnlyTableHeaderRow(tableMetaData);
byte[] key = m.getRow();
SchemaUtil.getVarChars(key, rowKeyMetadata);
byte[] schemaBytes = rowKeyMetadata[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
byte[] tableBytes = rowKeyMetadata[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
return SchemaUtil.getTableNameAsBytes(schemaBytes, tableBytes);
}

private static List<HRegionLocation> generateRegionLocations(byte[] physicalName, byte[][] splits) {
byte[] startKey = HConstants.EMPTY_START_ROW;
List<HRegionLocation> regions = Lists.newArrayListWithExpectedSize(splits.length);
for (byte[] split : splits) {
regions.add(new HRegionLocation(
new HRegionInfo(TableName.valueOf(physicalName), startKey, split),
SERVER_NAME, -1));
startKey = split;
}
regions.add(new HRegionLocation(
new HRegionInfo(TableName.valueOf(physicalName), startKey, HConstants.EMPTY_END_ROW),
SERVER_NAME, -1));
return regions;
}

@Override
public MetaDataMutationResult createTable(List<Mutation> tableMetaData, byte[] physicalName, PTableType tableType, Map<String,Object> tableProps, List<Pair<byte[],Map<String,Object>>> families, byte[][] splits) throws SQLException {
if (splits != null) {
byte[] tableName = getTableName(tableMetaData, physicalName);
tableSplits.put(Bytes.toString(tableName), generateRegionLocations(tableName, splits));
}
return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, 0, null);
}

@Override
public MetaDataMutationResult dropTable(List<Mutation> tableMetadata, PTableType tableType, boolean cascade) throws SQLException {
byte[] tableName = getTableName(tableMetadata, null);
tableSplits.remove(Bytes.toString(tableName));
return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, 0, null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.end2end;
package org.apache.phoenix.query;

import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.assertEquals;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ParameterMetaData;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.apache.hadoop.hbase.HRegionLocation;
Expand All @@ -53,43 +51,29 @@
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.parse.FilterableStatement;
import org.apache.phoenix.parse.SelectStatement;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.schema.ColumnRef;
import org.apache.phoenix.schema.PDataType;
import org.apache.phoenix.schema.PDatum;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableKey;
import org.apache.phoenix.schema.RowKeySchema;
import org.apache.phoenix.schema.RowKeySchema.RowKeySchemaBuilder;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.ScanUtil;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;

import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;


/**
* Tests for {@link SkipRangeParallelIteratorRegionSplitter}.
* TODO: Change this to be a connectionless test (ParallelIteratorsTest) with the ability to specify split points.
* -- On Connectionless, remember the split points of a table and use those when it says
* -- getRegionLocations
* -- Then drive this from a query plus getting the query plan and confirming the ranges
* -- from the plan.
*/
@RunWith(Parameterized.class)
@Category(ClientManagedTimeTest.class)
public class SkipRangeParallelIteratorRegionSplitterIT extends BaseClientManagedTimeIT {
public class ParallelIteratorsSplitTest extends BaseConnectionlessQueryTest {

private static final String TABLE_NAME = "TEST_SKIP_RANGE_PARALLEL_ITERATOR";
private static final String DDL = "CREATE TABLE " + TABLE_NAME + " (id char(3) NOT NULL PRIMARY KEY, \"value\" integer)";
Expand All @@ -104,25 +88,22 @@ public class SkipRangeParallelIteratorRegionSplitterIT extends BaseClientManaged
private final ScanRanges scanRanges;
private final List<KeyRange> expectedSplits;

public SkipRangeParallelIteratorRegionSplitterIT(Scan scan, ScanRanges scanRanges, List<KeyRange> expectedSplits) {
public ParallelIteratorsSplitTest(Scan scan, ScanRanges scanRanges, List<KeyRange> expectedSplits) {
this.scan = scan;
this.scanRanges = scanRanges;
this.expectedSplits = expectedSplits;
}

@Test
@Ignore
public void testGetSplitsWithSkipScanFilter() throws Exception {
byte[][] splits = new byte[][] {Ka1A, Ka1B, Ka1E, Ka1G, Ka1I, Ka2A};
long ts = nextTimestamp();
createTestTable(getUrl(),DDL,splits, ts-2);
String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + ts;
createTestTable(getUrl(),DDL,splits, null);
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
Connection conn = DriverManager.getConnection(url, props);
Connection conn = DriverManager.getConnection(getUrl(), props);
PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);

initTableValues();
TableRef tableRef = new TableRef(null,pconn.getMetaDataCache().getTable(new PTableKey(pconn.getTenantId(), TABLE_NAME)),ts, false);
PTable table = pconn.getMetaDataCache().getTable(new PTableKey(pconn.getTenantId(), TABLE_NAME));
TableRef tableRef = new TableRef(table);
List<HRegionLocation> regions = pconn.getQueryServices().getAllTableRegions(tableRef.getTable().getPhysicalName().getBytes());
List<KeyRange> ranges = getSplits(tableRef, scan, regions, scanRanges);
assertEquals("Unexpected number of splits: " + ranges.size(), expectedSplits.size(), ranges.size());
Expand All @@ -135,6 +116,22 @@ private static KeyRange getKeyRange(byte[] lowerRange, boolean lowerInclusive, b
return PDataType.CHAR.getKeyRange(lowerRange, lowerInclusive, upperRange, upperInclusive);
}

private static KeyRange getKeyRange(String lowerRange, boolean lowerInclusive, String upperRange, boolean upperInclusive) {
return PDataType.CHAR.getKeyRange(Bytes.toBytes(lowerRange), lowerInclusive, Bytes.toBytes(upperRange), upperInclusive);
}

private static KeyRange getKeyRange(String lowerRange, boolean lowerInclusive, byte[] upperRange, boolean upperInclusive) {
return PDataType.CHAR.getKeyRange(Bytes.toBytes(lowerRange), lowerInclusive, upperRange, upperInclusive);
}

private static KeyRange getKeyRange(byte[] lowerRange, boolean lowerInclusive, String upperRange, boolean upperInclusive) {
return PDataType.CHAR.getKeyRange(lowerRange, lowerInclusive, Bytes.toBytes(upperRange), upperInclusive);
}

private static String nextKey(String s) {
return Bytes.toString(ByteUtil.nextKey(Bytes.toBytes(s)));
}

@Parameters(name="{1} {2}")
public static Collection<Object> data() {
List<Object> testCases = Lists.newArrayList();
Expand Down Expand Up @@ -168,7 +165,7 @@ public static Collection<Object> data() {
}},
new int[] {1,1,1},
new KeyRange[] {
getKeyRange(KeyRange.UNBOUND, true, Ka1A, false)
getKeyRange("a0A", true, nextKey("a0Z"), false)
}));
// Scan range lies in between first and second, intersecting bound on second.
testCases.addAll(
Expand All @@ -183,7 +180,7 @@ public static Collection<Object> data() {
}},
new int[] {1,1,1},
new KeyRange[] {
getKeyRange(KeyRange.UNBOUND, true, Ka1A, false),
getKeyRange("a0A", true, Ka1A, false),
getKeyRange(Ka1A, true, Ka1B, false),
}));
// Scan range spans third, split into 3 due to concurrency config.
Expand Down Expand Up @@ -226,8 +223,8 @@ public static Collection<Object> data() {
}},
new int[] {1,1,1},
new KeyRange[] {
getKeyRange(Ka1E, true, Ka1G, false),
getKeyRange(Ka1G, true, Ka1I, false),
getKeyRange("a1F", true, Ka1G, false),
getKeyRange(Ka1G, true, "a1H", false),
}));
// Scan range spans more than 3 range, no split.
testCases.addAll(
Expand All @@ -248,7 +245,7 @@ public static Collection<Object> data() {
getKeyRange(Ka1A, true, Ka1B, false),
getKeyRange(Ka1B, true, Ka1E, false),
getKeyRange(Ka1G, true, Ka1I, false),
getKeyRange(Ka2A, true, KeyRange.UNBOUND, false)
getKeyRange(Ka2A, true, nextKey("b2G"), false)
}));
return testCases;
}
Expand Down Expand Up @@ -296,7 +293,7 @@ private static Collection<?> foreach(KeyRange[][] ranges, int[] widths, KeyRange
SkipScanFilter filter = new SkipScanFilter(slots, schema);
// Always set start and stop key to max to verify we are using the information in skipscan
// filter over the scan's KMIN and KMAX.
Scan scan = new Scan().setFilter(filter).setStartRow(KeyRange.UNBOUND).setStopRow(KeyRange.UNBOUND);
Scan scan = new Scan().setFilter(filter);
ScanRanges scanRanges = ScanRanges.create(schema, slots, ScanUtil.getDefaultSlotSpans(ranges.length));
List<Object> ret = Lists.newArrayList();
ret.add(new Object[] {scan, scanRanges, Arrays.<KeyRange>asList(expectedSplits)});
Expand All @@ -311,30 +308,6 @@ public List<KeyRange> apply(KeyRange[] input) {
}
};

private void initTableValues() throws SQLException {
String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + nextTimestamp();
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
Connection conn = DriverManager.getConnection(url, props);
PreparedStatement stmt = conn.prepareStatement(
"upsert into " + TABLE_NAME + " VALUES (?, ?)");
stmt.setString(1, new String("a1A"));
stmt.setInt(2, 1);
stmt.execute();
stmt.setString(1, new String("a1E"));
stmt.setInt(2, 2);
stmt.execute();
conn.commit();
conn.close();
}

@BeforeClass
@Shadower(classBeingShadowed = BaseClientManagedTimeIT.class)
public static void doSetup() throws Exception {
Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
// Must update config before starting server
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
}

private static List<KeyRange> getSplits(final TableRef tableRef, final Scan scan, final List<HRegionLocation> regions,
final ScanRanges scanRanges) throws SQLException {
final List<TableRef> tableRefs = Collections.singletonList(tableRef);
Expand All @@ -357,8 +330,7 @@ public ColumnRef resolveColumn(String schemaName, String tableName, String colNa
}

};
String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + nextTimestamp();
PhoenixConnection connection = DriverManager.getConnection(url, PropertiesUtil.deepCopy(TEST_PROPERTIES)).unwrap(PhoenixConnection.class);
PhoenixConnection connection = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES)).unwrap(PhoenixConnection.class);
final PhoenixStatement statement = new PhoenixStatement(connection);
final StatementContext context = new StatementContext(statement, resolver, scan, new SequenceManager(statement));
context.setScanRanges(scanRanges);
Expand Down

0 comments on commit 8ff8fed

Please sign in to comment.