diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index d0463ab5..e2294ff0 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -1446,8 +1446,15 @@ private boolean checkAndMutation(byte[] row, byte[] family, byte[] qualifier, } @Override - public void mutateRow(RowMutations rm) { - throw new FeatureNotSupportedException("not supported yet."); + public void mutateRow(RowMutations rm) throws IOException { + List mutations = rm.getMutations(); + for (Mutation mutation : mutations) { + if (!(mutation instanceof Delete ||mutation instanceof Put)) { + throw new FeatureNotSupportedException("only put and delete is supported in mutateRow"); + } + } + Object[] results = new Object[mutations.size()]; + batch(mutations, results); } /** @@ -1964,6 +1971,14 @@ private ObTableQuery buildObTableQuery(final Get get, Collection columnQ ObTableQuery obTableQuery; ObHTableFilter filter = buildObHTableFilter(get.getFilter(), get.getTimeRange(), get.getMaxVersions(), columnQualifiers); + + if (get.getMaxResultsPerColumnFamily() > 0) { + filter.setLimitPerRowPerCf(get.getMaxResultsPerColumnFamily()); + } + if (get.getRowOffsetPerColumnFamily() > 0) { + filter.setOffsetPerRowPerCf(get.getRowOffsetPerColumnFamily()); + } + if (get.isClosestRowBefore()) { obTableQuery = buildObTableQuery(filter, HConstants.EMPTY_BYTE_ARRAY, true, get.getRow(), true, true, get.getTimeRange()); diff --git a/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java b/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java index 4dd3e0f7..25eb7377 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java +++ b/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java @@ -17,6 +17,9 @@ package com.alipay.oceanbase.hbase; +import com.alipay.oceanbase.hbase.exception.FeatureNotSupportedException; +import com.alipay.oceanbase.hbase.util.ObHTableTestUtil; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.filter.*; @@ -40,6 +43,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import static com.alipay.oceanbase.hbase.OHTableAdminInterfaceTest.closeHbaseAdminDDL; +import static com.alipay.oceanbase.hbase.OHTableAdminInterfaceTest.openHbaseAdminDDL; +import static com.alipay.oceanbase.hbase.util.ObHTableSecondaryPartUtil.*; import static org.apache.hadoop.hbase.filter.FilterList.Operator.MUST_PASS_ALL; import static org.apache.hadoop.hbase.filter.FilterList.Operator.MUST_PASS_ONE; import static org.apache.hadoop.hbase.util.Bytes.toBytes; @@ -5975,7 +5981,7 @@ public void testExist() throws IOException { Assert.assertFalse(hTable.exists(get)); } - @Ignore + @Test public void testMutateRow() throws IOException { String column1 = "mutationRowColumn1"; @@ -6054,6 +6060,257 @@ public void testMutateRow() throws IOException { hTable.delete(deleteFamily); } + @Test + public void testMutateRow2() throws Exception { + Configuration conf = ObHTableTestUtil.newConfiguration(); + Connection connection = ConnectionFactory.createConnection(conf); + Admin admin = connection.getAdmin(); + + TableName tableName = TableName.valueOf(Bytes.toBytes("testMutateRow2")); + String cf1 = "cf1"; + String cf2 = "cf2"; + String rowKey = "rowKey"; + String col1 = "col1"; + String col2 = "col2"; + String value = "hello"; + + HColumnDescriptor hcd1 = new HColumnDescriptor(cf1); + HColumnDescriptor hcd2 = new HColumnDescriptor(cf2); + + HTableDescriptor htd = new HTableDescriptor(tableName); + htd.addFamily(hcd1); + htd.addFamily(hcd2); + Table table = connection.getTable(tableName); + + try { + openHbaseAdminDDL(); + admin.createTable(htd); + + // 1. mutateRow using put + RowMutations rm1 = new RowMutations(rowKey.getBytes()); + Put put1 = new Put(rowKey.getBytes()); + Put put2 = new Put(rowKey.getBytes()); + put1.addColumn(cf1.getBytes(), col1.getBytes(), value.getBytes()); + put2.addColumn(cf1.getBytes(), col2.getBytes(), value.getBytes()); + rm1.add(put1); + rm1.add(put2); + table.mutateRow(rm1); + Get get1 = new Get(rowKey.getBytes()); + Result result = table.get(get1); + Assert.assertEquals(2, result.rawCells().length); + AssertKeyValue(rowKey, col1, value, result.rawCells()[0]); + AssertKeyValue(rowKey, col2, value, result.rawCells()[1]); + + // 2. mutateRow using delete + rm1 = new RowMutations(rowKey.getBytes()); + Delete del1 = new Delete(rowKey.getBytes()); + Delete del2 = new Delete(rowKey.getBytes()); + del1.addColumn(cf1.getBytes(), col1.getBytes()); + del2.addColumn(cf1.getBytes(), col2.getBytes()); + rm1.add(del1); + rm1.add(del2); + table.mutateRow(rm1); + get1 = new Get(rowKey.getBytes()); + result = table.get(get1); + Assert.assertEquals(0, result.rawCells().length); + + // 3. mutateRow using put and delete + rm1 = new RowMutations(rowKey.getBytes()); + put1 = new Put(rowKey.getBytes()); + put1.addColumn(cf1.getBytes(), col1.getBytes(), value.getBytes()); + del1 = new Delete(rowKey.getBytes()); + del1.addColumn(cf1.getBytes(), col1.getBytes()); + put2 = new Put(rowKey.getBytes()); + put2.addColumn(cf1.getBytes(), col2.getBytes(), value.getBytes()); + rm1.add(put1); + rm1.add(del1); + rm1.add(put2); + table.mutateRow(rm1); + get1 = new Get(rowKey.getBytes()); + result = table.get(get1); + Assert.assertEquals(1, result.rawCells().length); + AssertKeyValue(rowKey, col2, value, result.rawCells()[0]); + + // 4. mutateRow with multi-cf put and delete + rm1 = new RowMutations(rowKey.getBytes()); + put1 = new Put(rowKey.getBytes()); + put1.addColumn(cf1.getBytes(), col1.getBytes(), value.getBytes()); + put1.addColumn(cf2.getBytes(), col2.getBytes(), value.getBytes()); + + del1 = new Delete(rowKey.getBytes()); + + put2 = new Put(rowKey.getBytes()); + put2.addColumn(cf1.getBytes(), col1.getBytes(), value.getBytes()); + put2.addColumn(cf2.getBytes(), col2.getBytes(), value.getBytes()); + + del2 = new Delete(rowKey.getBytes()); + del2.addFamily(cf2.getBytes()); + + rm1.add(put1); + rm1.add(del1); + rm1.add(put2); + rm1.add(del2); + table.mutateRow(rm1); + get1 = new Get(rowKey.getBytes()); + result = table.get(get1); + Assert.assertEquals(1, result.rawCells().length); + AssertKeyValue(rowKey, col1, value, result.rawCells()[0]); + Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(result.rawCells()[0])), cf1); + + // 5. mutateRow with increment/append, should be not supported + Increment increment = new Increment(rowKey.getBytes()); + increment.addColumn(cf1.getBytes(), col1.getBytes(), 1L); + rm1.add(increment); + try { + table.mutateRow(rm1); + fail(); + } catch (Exception e) { + Assert.assertEquals(FeatureNotSupportedException.class, e.getClass()); + Assert.assertEquals("only put and delete is supported in mutateRow", e.getMessage()); + } + + Append append = new Append(rowKey.getBytes()); + append.addColumn(cf1.getBytes(), col1.getBytes(), "value".getBytes()); + rm1.add(put1); + rm1.add(del1); + rm1.add(append); + try { + table.mutateRow(rm1); + fail(); + } catch (Exception e) { + Assert.assertEquals(FeatureNotSupportedException.class, e.getClass()); + Assert.assertEquals("only put and delete is supported in mutateRow", e.getMessage()); + } + + // 6. mutateRow with different rowKey, should failed + put1 = new Put((rowKey + "_1").getBytes()); + try { + rm1.add(put1); + fail(); + } catch (Exception e) { + Assert.assertEquals(WrongRowIOException.class, e.getClass()); + Assert.assertEquals("The row in the recently added Put/Delete doesn't match the original one ", e.getMessage()); + } + + } catch (Exception e) { + e.printStackTrace(); + fail(); + } finally { + if (admin.tableExists(tableName)) { + if (admin.isTableEnabled(tableName)) { + admin.disableTable(tableName); + } + admin.deleteTable(tableName); + } + closeHbaseAdminDDL(); + } + } + + @Test + public void testGetWithCFOffsetLimit() throws Exception { + Configuration conf = ObHTableTestUtil.newConfiguration(); + Connection connection = ConnectionFactory.createConnection(conf); + Admin admin = connection.getAdmin(); + + TableName tableName = TableName.valueOf(Bytes.toBytes("testGetWithCFOffsetLimit")); + String cf1 = "cf1"; + String cf2 = "cf2"; + String rowKey = "rowKey"; + String[] cf1cols = {"col11", "col12", "col13", "col14", "col15"}; + String[] cf2cols = {"col21", "col22", "col23", "col24", "col25"}; + String cf1values = "cf1_value"; + String cf2values = "cf2_value"; + + HColumnDescriptor hcd1 = new HColumnDescriptor(cf1); + HColumnDescriptor hcd2 = new HColumnDescriptor(cf2); + + HTableDescriptor htd = new HTableDescriptor(tableName); + htd.addFamily(hcd1); + htd.addFamily(hcd2); + Table table = connection.getTable(tableName); + try { + openHbaseAdminDDL(); + admin.createTable(htd); + Put put = new Put(rowKey.getBytes()); + for (int i = 0; i < cf1cols.length; i++) { + put.addColumn(cf1.getBytes(), cf1cols[i].getBytes(), cf1values.getBytes()); + put.addColumn(cf2.getBytes(), cf2cols[i].getBytes(), cf2values.getBytes()); + } + table.put(put); + + // 1. set limit sinlge of + Get get = new Get(Bytes.toBytes(rowKey)); + get.addFamily(cf1.getBytes()); + get.setMaxResultsPerColumnFamily(3); + Result result = table.get(get); + Assert.assertEquals(3, result.rawCells().length); + AssertKeyValue(rowKey, cf1cols[0], cf1values, result.rawCells()[0]); + AssertKeyValue(rowKey, cf1cols[1], cf1values, result.rawCells()[1]); + AssertKeyValue(rowKey, cf1cols[2], cf1values, result.rawCells()[2]); + + // 2. set offset single of + get = new Get(Bytes.toBytes(rowKey)); + get.addFamily(cf1.getBytes()); + get.setRowOffsetPerColumnFamily(3); + result = table.get(get); + Assert.assertEquals(2, result.rawCells().length); + AssertKeyValue(rowKey, cf1cols[3], cf1values, result.rawCells()[0]); + AssertKeyValue(rowKey, cf1cols[4], cf1values, result.rawCells()[1]); + + // 3. set limit and offset single of + get = new Get(Bytes.toBytes(rowKey)); + get.addFamily(cf1.getBytes()); + get.setRowOffsetPerColumnFamily(3); + get.setMaxResultsPerColumnFamily(1); + result = table.get(get); + Assert.assertEquals(1, result.rawCells().length); + AssertKeyValue(rowKey, cf1cols[3], cf1values, result.rawCells()[0]); + + // 4. set limit multi-cf + get = new Get(Bytes.toBytes(rowKey)); + get.setMaxResultsPerColumnFamily(3); + result = table.get(get); + Assert.assertEquals(6, result.rawCells().length); + AssertKeyValue(rowKey, cf1cols[0], cf1values, result.rawCells()[0]); + AssertKeyValue(rowKey, cf1cols[1], cf1values, result.rawCells()[1]); + AssertKeyValue(rowKey, cf1cols[2], cf1values, result.rawCells()[2]); + AssertKeyValue(rowKey, cf2cols[0], cf2values, result.rawCells()[3]); + AssertKeyValue(rowKey, cf2cols[1], cf2values, result.rawCells()[4]); + AssertKeyValue(rowKey, cf2cols[2], cf2values, result.rawCells()[5]); + + // 5. set offset multi-cf + get = new Get(Bytes.toBytes(rowKey)); + get.setRowOffsetPerColumnFamily(3); + result = table.get(get); + Assert.assertEquals(4, result.rawCells().length); + AssertKeyValue(rowKey, cf1cols[3], cf1values, result.rawCells()[0]); + AssertKeyValue(rowKey, cf1cols[4], cf1values, result.rawCells()[1]); + AssertKeyValue(rowKey, cf2cols[3], cf2values, result.rawCells()[2]); + AssertKeyValue(rowKey, cf2cols[4], cf2values, result.rawCells()[3]); + + // 6. set limit and offset multi-cf + get = new Get(Bytes.toBytes(rowKey)); + get.setRowOffsetPerColumnFamily(3); + get.setMaxResultsPerColumnFamily(1); + result = table.get(get); + Assert.assertEquals(2, result.rawCells().length); + AssertKeyValue(rowKey, cf1cols[3], cf1values, result.rawCells()[0]); + AssertKeyValue(rowKey, cf2cols[3], cf2values, result.rawCells()[1]); + + } catch (Exception e) { + e.printStackTrace(); + fail(); + } finally { + if (admin.tableExists(tableName)) { + if (admin.isTableEnabled(tableName)) { + admin.disableTable(tableName); + } + admin.deleteTable(tableName); + } + closeHbaseAdminDDL(); + } + } + @Test public void testQualifyNull() throws Exception { // delete 只支持删一行