Skip to content

Commit

Permalink
CDH-4551: Get TestHBaseSink working
Browse files Browse the repository at this point in the history
  • Loading branch information
harishreedharan committed May 22, 2012
1 parent 98afcc4 commit 9b27085
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

import junit.framework.Assert;

import org.apache.hadoop.hbase.HBaseTestCase;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
Expand All @@ -49,22 +48,22 @@
* Test the hbase sink writes events to a table/family properly
*/
public class TestHBaseSink {
private static HBaseTestEnv hbaseEnv;
public static Logger LOG = LoggerFactory.getLogger(TestHBaseSink.class);
public static final String DEFAULT_HOST = "qwigibo";
private static HBaseTestingUtility testingUtility = new HBaseTestingUtility();


@BeforeClass
public static void setup() throws Exception {
// expensive, so just do it once for all tests, just make sure
// that tests don't overlap (use diff tables for each test)
hbaseEnv = new HBaseTestEnv();
hbaseEnv.conf.set(HBaseTestingUtility.BASE_TEST_DIRECTORY_KEY, "build/test/data");
hbaseEnv.setUp();
testingUtility.startMiniCluster();
testingUtility.getConfiguration().set(HBaseTestingUtility.BASE_TEST_DIRECTORY_KEY, "build/test/data");
}

@AfterClass
public static void teardown() throws Exception {
hbaseEnv.tearDown();
testingUtility.shutdownMiniCluster();
}

void shipThreeEvents(HBaseSink snk) throws IOException {
Expand Down Expand Up @@ -101,32 +100,27 @@ void shipThreeEvents(HBaseSink snk) throws IOException {
/**
* Write events to a sink directly, verify by scanning HBase table. x
*/
// @Test
@Test
public void testSink() throws IOException, InterruptedException {
final String tableName = "testSinkTab";
final String tableFamily1 = "family1";
final String tableFamily2 = "family2";

HBaseAdmin admin = hbaseEnv.getHBaseAdmin();

// create the table and column family to be used by sink
HTableDescriptor desc = new HTableDescriptor(tableName);
desc.addFamily(new HColumnDescriptor(tableFamily1));
desc.addFamily(new HColumnDescriptor(tableFamily2));
// HBaseAdmin admin = new HBaseAdmin(hbaseEnv.conf);
admin.createTable(desc);
admin.flush(tableName);

System.out.println("Starting.. testSinkTab:");
byte[][] args = new byte[2][];
args[0]=tableFamily1.getBytes();
args[1]=tableFamily2.getBytes();
testingUtility.createTable(tableName.getBytes(),args);
System.out.println("Created table");
// explicit constructor rather than builder - we want to control the conf
List<QualifierSpec> spec = new ArrayList<QualifierSpec>();
spec.add(new QualifierSpec(tableFamily1, "col1", "%{attr1}"));
spec.add(new QualifierSpec(tableFamily2, "col2", "%{attr2}"));
HBaseSink snk = new HBaseSink(tableName, "%{rowkey}", spec, 0L, false,
hbaseEnv.conf);
testingUtility.getConfiguration());
shipThreeEvents(snk);

// verify that the events made it into hbase
HTable table = new HTable(hbaseEnv.conf, tableName);
HTable table = new HTable(testingUtility.getConfiguration(), tableName);
try {
for (long i = 0; i <= 2; i++) {
Result r = table.get(new Get(Bytes.toBytes("row-key" + i)));
Expand All @@ -143,37 +137,36 @@ public void testSink() throws IOException, InterruptedException {
}
} finally {
table.close();
admin.disableTable(tableName);
admin.deleteTable(tableName);
testingUtility.getHBaseAdmin().disableTable(tableName);
testingUtility.getHBaseAdmin().deleteTable(tableName);
}
}

/**
* Write events to a sink directly, verify by scanning HBase table. x
*/
// @Test
@Test
public void testSinkEmptyCol() throws IOException, InterruptedException {
final String tableName = "testSinkEmptyColTab";
final String tableFamily1 = "family1";
final String tableFamily2 = "family2";

// create the table and column family to be used by sink
HTableDescriptor desc = new HTableDescriptor(tableName);
desc.addFamily(new HColumnDescriptor(tableFamily1));
desc.addFamily(new HColumnDescriptor(tableFamily2));
HBaseAdmin admin = new HBaseAdmin(hbaseEnv.conf);
admin.createTable(desc);

System.out.println("Starting.. testSinkTab:");
byte[][] args = new byte[2][];
args[0]=tableFamily1.getBytes();
args[1]=tableFamily2.getBytes();
testingUtility.createTable(tableName.getBytes(),args);
System.out.println("Created table");
// explicit constructor rather than builder - we want to control the conf
List<QualifierSpec> spec = new ArrayList<QualifierSpec>();
spec.add(new QualifierSpec(tableFamily1, "", "%{attr1}"));
spec.add(new QualifierSpec(tableFamily2, "", "%{attr2}"));
HBaseSink snk = new HBaseSink(tableName, "%{rowkey}", spec, 0L, false,
hbaseEnv.conf);
testingUtility.getConfiguration());
shipThreeEvents(snk);

// verify that the events made it into hbase
HTable table = new HTable(hbaseEnv.conf, tableName);
HTable table = new HTable(testingUtility.getConfiguration(), tableName);
try {
for (long i = 0; i <= 2; i++) {
Result r = table.get(new Get(Bytes.toBytes("row-key" + i)));
Expand All @@ -188,37 +181,36 @@ public void testSinkEmptyCol() throws IOException, InterruptedException {
}
} finally {
table.close();
admin.disableTable(tableName);
admin.deleteTable(tableName);
testingUtility.getHBaseAdmin().disableTable(tableName);
testingUtility.getHBaseAdmin().deleteTable(tableName);
}
}

/**
* Write events to a sink directly, verify by scanning HBase table. x
*/
// @Test
@Test
public void testSinkEscaping() throws IOException, InterruptedException {
final String tableName = "testSinkEscapingTab";
final String tableFamily1 = "family1";
final String tableFamily2 = "family2";

// create the table and column family to be used by sink
HTableDescriptor desc = new HTableDescriptor(tableName);
desc.addFamily(new HColumnDescriptor(tableFamily1));
desc.addFamily(new HColumnDescriptor(tableFamily2));
HBaseAdmin admin = new HBaseAdmin(hbaseEnv.conf);
admin.createTable(desc);

System.out.println("Starting.. testSinkTab:");
byte[][] args = new byte[2][];
args[0]=tableFamily1.getBytes();
args[1]=tableFamily2.getBytes();
testingUtility.createTable(tableName.getBytes(),args);
System.out.println("Created table");
// explicit constructor rather than builder - we want to control the conf
List<QualifierSpec> spec = new ArrayList<QualifierSpec>();
spec.add(new QualifierSpec(tableFamily1, "%{priority}", "%{body}"));
spec.add(new QualifierSpec(tableFamily2, "col2", "%{badescape}"));
HBaseSink snk = new HBaseSink(tableName, "%{host}-%{rowkey}", spec, 0L,
false, hbaseEnv.conf);
false, testingUtility.getConfiguration());
shipThreeEvents(snk);

// verify that the events made it into hbase
HTable table = new HTable(hbaseEnv.conf, tableName);
HTable table = new HTable(testingUtility.getConfiguration(), tableName);
try {
for (long i = 0; i <= 2; i++) {
Result r = table.get(new Get(Bytes.toBytes(DEFAULT_HOST + "-row-key"
Expand All @@ -235,12 +227,12 @@ public void testSinkEscaping() throws IOException, InterruptedException {
}
} finally {
table.close();
admin.disableTable(tableName);
admin.deleteTable(tableName);
testingUtility.getHBaseAdmin().disableTable(tableName);
testingUtility.getHBaseAdmin().deleteTable(tableName);
}
}

// @Test(expected = TableNotFoundException.class)
@Test(expected = TableNotFoundException.class)
public void testOpenFailBadTable() throws IOException {
final String tableFamily1 = "family1";
final String tableFamily2 = "family2";
Expand All @@ -250,7 +242,7 @@ public void testOpenFailBadTable() throws IOException {
spec.add(new QualifierSpec(tableFamily1, "col1", "%{attr1}"));
spec.add(new QualifierSpec(tableFamily2, "col2", "%{attr2}")); // invalid
HBaseSink snk = new HBaseSink("bogus table name", "%{rowkey}", spec, 0L,
false, hbaseEnv.conf);
false, testingUtility.getConfiguration());
shipThreeEvents(snk);
}

Expand All @@ -260,21 +252,17 @@ public void testOpenFailBadColFam() throws IOException {
final String tableFamily1 = "family1";
final String tableFamily2 = "family2";

// create the table and column family to be used by sink
HTableDescriptor desc = new HTableDescriptor(tableName);
desc.addFamily(new HColumnDescriptor(tableFamily1));
HBaseAdmin admin = new HBaseAdmin(hbaseEnv.conf);
admin.createTable(desc);

testingUtility.createTable(tableName.getBytes(),tableFamily1.getBytes());
System.out.println("Created table");
// explicit constructor rather than builder - we want to control the conf
List<QualifierSpec> spec = new ArrayList<QualifierSpec>();
spec.add(new QualifierSpec(tableFamily1, "col1", "%{attr1}"));
spec.add(new QualifierSpec(tableFamily2, "col2", "%{attr2}")); // invalid
HBaseSink snk = new HBaseSink(tableName, "%{rowkey}", spec, 0L, false,
hbaseEnv.conf);
testingUtility.getConfiguration());
shipThreeEvents(snk);
admin.disableTable(tableName);
admin.deleteTable(tableName);
testingUtility.getHBaseAdmin().disableTable(tableName);
testingUtility.getHBaseAdmin().deleteTable(tableName);
}

}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>r07</version>
<version>11.0.2</version>
</dependency>

<dependency>
Expand Down

0 comments on commit 9b27085

Please sign in to comment.