Skip to content

Commit 5b2487f

Browse files
authored
Merge pull request #140 from stuBirdFly/client_2_1_0
hbase support cell TTL
2 parents 68943f3 + 97feb20 commit 5b2487f

File tree

6 files changed

+411
-125
lines changed

6 files changed

+411
-125
lines changed

src/main/java/com/alipay/oceanbase/hbase/OHTable.java

Lines changed: 155 additions & 106 deletions
Large diffs are not rendered by default.

src/main/java/com/alipay/oceanbase/hbase/constants/OHConstants.java

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -72,16 +72,6 @@ public final class OHConstants {
7272
*/
7373
public static final String HBASE_OCEANBASE_DATABASE = "hbase.oceanbase.database";
7474

75-
/**
76-
* ocenbase hbase model is consist of following columns
77-
* K hbase row key
78-
* Q hbase qualifier
79-
* T hbase timeStamp
80-
* V hbase value
81-
*/
82-
public static final String[] ALL_COLUMNS = new String[] { "K",
83-
"Q", "T", "V" };
84-
8575
/**
8676
* ocenbase hbase model rowkey column is consist of following column
8777
* K, Q, T hbase value
@@ -94,6 +84,8 @@ public final class OHConstants {
9484
* V hbase value
9585
*/
9686
public static final String[] V_COLUMNS = new String[] { "V" };
87+
public static final String[] PROPERTY_COLUMNS = new String[] { "V",
88+
"TTL" };
9789

9890
public static final String HBASE_HTABLE_POOL_SEPERATOR = "$";
9991

src/main/java/com/alipay/oceanbase/hbase/result/ClientStreamScanner.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,15 @@
2424
import com.alipay.oceanbase.rpc.stream.ObTableClientQueryAsyncStreamResult;
2525
import com.alipay.oceanbase.rpc.stream.ObTableClientQueryStreamResult;
2626
import org.apache.hadoop.classification.InterfaceAudience;
27-
import org.apache.hadoop.hbase.Cell;
28-
import org.apache.hadoop.hbase.KeyValue;
27+
import org.apache.hadoop.hbase.*;
2928
import org.apache.hadoop.hbase.client.AbstractClientScanner;
3029
import org.apache.hadoop.hbase.client.Result;
3130
import org.apache.hadoop.hbase.client.Scan;
3231
import org.apache.hadoop.hbase.util.Bytes;
32+
import org.mortbay.util.SingletonList;
3333
import org.slf4j.Logger;
3434
import java.io.IOException;
35+
import java.nio.ByteBuffer;
3536
import java.util.*;
3637

3738
import static com.alipay.oceanbase.hbase.util.TableHBaseLoggerFactory.LCD;
@@ -101,7 +102,6 @@ public Result next() throws IOException {
101102
byte[] sq = familyAndQualifier[1];
102103
long st = (Long) startRow.get(2).getValue();
103104
byte[] sv = (byte[]) startRow.get(3).getValue();
104-
105105
KeyValue startKeyValue = new KeyValue(sk, family, sq, st, sv);
106106
List<Cell> keyValues = new ArrayList<Cell>();
107107
keyValues.add(startKeyValue);

src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java

Lines changed: 241 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package com.alipay.oceanbase.hbase;
1919

20-
import com.alipay.oceanbase.hbase.exception.FeatureNotSupportedException;
2120
import org.apache.hadoop.hbase.*;
2221
import org.apache.hadoop.hbase.client.*;
2322
import org.apache.hadoop.hbase.filter.*;
@@ -5619,6 +5618,243 @@ public void testAppend() throws IOException {
56195618
}
56205619
}
56215620

5621+
@Test
5622+
public void testCellTTL() throws Exception {
5623+
String key1 = "key1";
5624+
String column1 = "cf1";
5625+
String column2 = "cf2";
5626+
String column3 = "cf3";
5627+
String family = "cellTTLFamily";
5628+
String value1 = "v1";
5629+
String value2 = "v2";
5630+
String app = "app";
5631+
5632+
Result r;
5633+
Put put1 = new Put(key1.getBytes());
5634+
put1.addColumn(family.getBytes(), column1.getBytes(), toBytes(11L));
5635+
put1.setTTL(5000);
5636+
Put put2 = new Put(key1.getBytes());
5637+
put2.addColumn(family.getBytes(), column1.getBytes(), toBytes(22L));
5638+
put2.addColumn(family.getBytes(), column2.getBytes(), toBytes(33L));
5639+
put2.setTTL(10000);
5640+
Put put3 = new Put(key1.getBytes());
5641+
put3.addColumn(family.getBytes(), column1.getBytes(), toBytes(11L));
5642+
put3.setTTL(-3000);
5643+
Put put4 = new Put(key1.getBytes());
5644+
put4.addColumn(family.getBytes(), column1.getBytes(), toBytes(11L));
5645+
put4.setTTL(0);
5646+
Put errorPut = new Put(key1.getBytes());
5647+
errorPut.addColumn("family1".getBytes(), column1.getBytes(), toBytes(11L));
5648+
errorPut.setTTL(10);
5649+
5650+
Get get = new Get(key1.getBytes());
5651+
get.addFamily(family.getBytes());
5652+
get.setMaxVersions(10);
5653+
try {
5654+
tryPut(hTable, errorPut);
5655+
} catch (Exception e) {
5656+
assertTrue(e.getCause().toString().contains("Unknown column 'TTL'"));
5657+
}
5658+
// test put and get
5659+
tryPut(hTable, put1);
5660+
tryPut(hTable, put2);
5661+
tryPut(hTable, put3);
5662+
tryPut(hTable, put4);
5663+
r = hTable.get(get);
5664+
assertEquals(3, r.size());
5665+
Thread.sleep(5000);
5666+
r = hTable.get(get);
5667+
assertEquals(2, r.size());
5668+
Thread.sleep(5000);
5669+
r = hTable.get(get);
5670+
assertEquals(0, r.size());
5671+
5672+
// test increment
5673+
tryPut(hTable, put1);
5674+
tryPut(hTable, put2);
5675+
Thread.sleep(1000);
5676+
Increment increment = new Increment(key1.getBytes());
5677+
increment.addColumn(family.getBytes(), column1.getBytes(), 1L);
5678+
increment.addColumn(family.getBytes(), column2.getBytes(), 2L);
5679+
increment.addColumn(family.getBytes(), column3.getBytes(), 5L);
5680+
increment.setTTL(-5000);
5681+
hTable.increment(increment);
5682+
increment.setTTL(5000);
5683+
hTable.increment(increment);
5684+
get.setMaxVersions(1);
5685+
r = hTable.get(get);
5686+
5687+
assertEquals(3, r.size());
5688+
assertEquals(23L, Bytes.toLong(CellUtil.cloneValue(r.getColumnCells(family.getBytes(),
5689+
column1.getBytes()).get(0))));
5690+
assertEquals(35L, Bytes.toLong(CellUtil.cloneValue(r.getColumnCells(family.getBytes(),
5691+
column2.getBytes()).get(0))));
5692+
assertEquals(5L, Bytes.toLong(CellUtil.cloneValue(r.getColumnCells(family.getBytes(),
5693+
column3.getBytes()).get(0))));
5694+
5695+
Thread.sleep(10000);
5696+
r = hTable.get(get);
5697+
assertEquals(0, r.size());
5698+
5699+
increment = new Increment(key1.getBytes());
5700+
increment.addColumn(family.getBytes(), column1.getBytes(), 1L);
5701+
increment.addColumn(family.getBytes(), column2.getBytes(), 2L);
5702+
increment.setTTL(5000);
5703+
hTable.increment(increment);
5704+
r = hTable.get(get);
5705+
5706+
assertEquals(2, r.size());
5707+
assertEquals(1L, Bytes.toLong(CellUtil.cloneValue(r.getColumnCells(family.getBytes(),
5708+
column1.getBytes()).get(0))));
5709+
assertEquals(2L, Bytes.toLong(CellUtil.cloneValue(r.getColumnCells(family.getBytes(),
5710+
column2.getBytes()).get(0))));
5711+
5712+
Thread.sleep(5000);
5713+
r = hTable.get(get);
5714+
assertEquals(0, r.size());
5715+
5716+
tryPut(hTable, put1);
5717+
tryPut(hTable, put2);
5718+
increment.addColumn(family.getBytes(), column1.getBytes(), 4L);
5719+
hTable.increment(increment);
5720+
5721+
r = hTable.get(get);
5722+
assertEquals(2, r.size());
5723+
assertEquals(26L, Bytes.toLong(CellUtil.cloneValue(r.getColumnCells(family.getBytes(),
5724+
column1.getBytes()).get(0))));
5725+
assertEquals(35L, Bytes.toLong(CellUtil.cloneValue(r.getColumnCells(family.getBytes(),
5726+
column2.getBytes()).get(0))));
5727+
5728+
// test append
5729+
Thread.sleep(10000);
5730+
r = hTable.get(get);
5731+
assertEquals(0, r.size());
5732+
5733+
put3 = new Put(key1.getBytes());
5734+
put3.addColumn(family.getBytes(), column1.getBytes(), toBytes(value1));
5735+
put3.addColumn(family.getBytes(), column2.getBytes(), toBytes(value2));
5736+
put3.setTTL(10000);
5737+
tryPut(hTable, put3);
5738+
5739+
Append append = new Append(key1.getBytes());
5740+
KeyValue kv = new KeyValue(key1.getBytes(), family.getBytes(), column1.getBytes(),
5741+
app.getBytes());
5742+
append.add(kv);
5743+
append.setTTL(-3000);
5744+
hTable.append(append);
5745+
append.setTTL(3000);
5746+
hTable.append(append);
5747+
5748+
r = hTable.get(get);
5749+
assertEquals(2, r.size());
5750+
assertEquals(
5751+
value1 + app,
5752+
Bytes.toString(CellUtil.cloneValue(r.getColumnCells(family.getBytes(),
5753+
column1.getBytes()).get(0))));
5754+
5755+
Thread.sleep(3000);
5756+
r = hTable.get(get);
5757+
assertEquals(2, r.size());
5758+
assertEquals(
5759+
value1,
5760+
Bytes.toString(CellUtil.cloneValue(r.getColumnCells(family.getBytes(),
5761+
column1.getBytes()).get(0))));
5762+
5763+
Thread.sleep(7000);
5764+
r = hTable.get(get);
5765+
assertEquals(0, r.size());
5766+
5767+
append.add(family.getBytes(), column1.getBytes(), app.getBytes());
5768+
hTable.append(append);
5769+
r = hTable.get(get);
5770+
assertEquals(1, r.size());
5771+
assertEquals(
5772+
app,
5773+
Bytes.toString(CellUtil.cloneValue(r.getColumnCells(family.getBytes(),
5774+
column1.getBytes()).get(0))));
5775+
5776+
Thread.sleep(3000);
5777+
append.add(family.getBytes(), column2.getBytes(), app.getBytes());
5778+
hTable.append(append);
5779+
r = hTable.get(get);
5780+
assertEquals(2, r.size());
5781+
assertEquals(
5782+
app,
5783+
Bytes.toString(CellUtil.cloneValue(r.getColumnCells(family.getBytes(),
5784+
column1.getBytes()).get(0))));
5785+
assertEquals(
5786+
app,
5787+
Bytes.toString(CellUtil.cloneValue(r.getColumnCells(family.getBytes(),
5788+
column2.getBytes()).get(0))));
5789+
5790+
// test checkAndMutate
5791+
Thread.sleep(3000);
5792+
r = hTable.get(get);
5793+
assertEquals(0, r.size());
5794+
5795+
tryPut(hTable, put1);
5796+
RowMutations rowMutations = new RowMutations(key1.getBytes());
5797+
rowMutations.add(put2);
5798+
Delete delete = new Delete(key1.getBytes());
5799+
delete.addColumn(family.getBytes(), column1.getBytes());
5800+
rowMutations.add(delete);
5801+
boolean succ = hTable.checkAndMutate(key1.getBytes(), family.getBytes(),
5802+
column1.getBytes(), CompareFilter.CompareOp.EQUAL, toBytes(11L), rowMutations);
5803+
assertTrue(succ);
5804+
r = hTable.get(get);
5805+
assertEquals(r.size(), 2);
5806+
assertEquals(11L, Bytes.toLong(CellUtil.cloneValue(r.getColumnCells(family.getBytes(),
5807+
column1.getBytes()).get(0))));
5808+
assertEquals(33L, Bytes.toLong(CellUtil.cloneValue(r.getColumnCells(family.getBytes(),
5809+
column2.getBytes()).get(0))));
5810+
5811+
Thread.sleep(10000);
5812+
r = hTable.get(get);
5813+
assertEquals(r.size(), 0);
5814+
5815+
tryPut(hTable, put1);
5816+
rowMutations = new RowMutations(key1.getBytes());
5817+
put4 = new Put(key1.getBytes());
5818+
put4.addColumn(family.getBytes(), column1.getBytes(), toBytes(22L));
5819+
put4.addColumn(family.getBytes(), column2.getBytes(), toBytes(33L));
5820+
put4.setTTL(10000);
5821+
rowMutations.add(put4);
5822+
succ = hTable.checkAndMutate(key1.getBytes(), family.getBytes(), column1.getBytes(),
5823+
CompareFilter.CompareOp.EQUAL, toBytes(1L), rowMutations);
5824+
assertFalse(succ);
5825+
succ = hTable.checkAndMutate(key1.getBytes(), family.getBytes(), column1.getBytes(),
5826+
CompareFilter.CompareOp.EQUAL, toBytes(11L), rowMutations);
5827+
assertTrue(succ);
5828+
5829+
r = hTable.get(get);
5830+
assertEquals(r.size(), 2);
5831+
assertEquals(22L, Bytes.toLong(CellUtil.cloneValue(r.getColumnCells(family.getBytes(),
5832+
column1.getBytes()).get(0))));
5833+
assertEquals(33L, Bytes.toLong(CellUtil.cloneValue(r.getColumnCells(family.getBytes(),
5834+
column2.getBytes()).get(0))));
5835+
5836+
Thread.sleep(5000);
5837+
r = hTable.get(get);
5838+
assertEquals(2, r.size());
5839+
assertEquals(22L, Bytes.toLong(CellUtil.cloneValue(r.getColumnCells(family.getBytes(),
5840+
column1.getBytes()).get(0))));
5841+
assertEquals(33L, Bytes.toLong(CellUtil.cloneValue(r.getColumnCells(family.getBytes(),
5842+
column2.getBytes()).get(0))));
5843+
5844+
Thread.sleep(5000);
5845+
r = hTable.get(get);
5846+
assertEquals(r.size(), 0);
5847+
put1 = new Put(key1.getBytes());
5848+
put1.addColumn(family.getBytes(), column1.getBytes(), toBytes(11L));
5849+
tryPut(hTable, put1);
5850+
5851+
increment = new Increment(key1.getBytes());
5852+
increment.addColumn(family.getBytes(), column1.getBytes(), 1L);
5853+
hTable.increment(increment);
5854+
r = hTable.get(get);
5855+
assertEquals(r.size(), 1);
5856+
}
5857+
56225858
@Test
56235859
public void testIncrement() throws IOException {
56245860
String column = "incrementColumn";
@@ -5883,17 +6119,17 @@ public void testFamilyBlank() throws Exception {
58836119
try {
58846120
hTable.append(append);
58856121
fail();
5886-
} catch (FeatureNotSupportedException e) {
5887-
Assert.assertTrue(e.getMessage().contains("family is empty"));
6122+
} catch (IllegalArgumentException e) {
6123+
Assert.assertTrue(e.getMessage().contains("zero columns specified"));
58886124
}
58896125

58906126
Increment increment = new Increment(key.getBytes());
58916127
// increment.addColumn(null, null, 1);
58926128
try {
58936129
hTable.increment(increment);
58946130
fail();
5895-
} catch (FeatureNotSupportedException e) {
5896-
Assert.assertTrue(e.getMessage().contains("family is empty"));
6131+
} catch (IllegalArgumentException e) {
6132+
Assert.assertTrue(e.getMessage().contains("zero columns specified"));
58976133
}
58986134
}
58996135

src/test/java/com/alipay/oceanbase/hbase/OHTableTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public static void setup() throws Exception {
4747
multiCfHTable = new OHTable(c, "test_multi_cf");
4848
List<String> tableGroups = new LinkedList<>();
4949
tableGroups.add("test");
50-
// tableGroups.add("test_multi_cf");
50+
tableGroups.add("test_multi_cf");
5151
ObHTableTestUtil.prepareClean(tableGroups);
5252
}
5353

src/test/java/unit_test_db.sql

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,15 @@ CREATE TABLE `test$family1` (
2222
PRIMARY KEY (`K`, `Q`, `T`)
2323
);
2424

25+
CREATE TABLE `test$cellTTLFamily` (
26+
`K` varbinary(1024) NOT NULL,
27+
`Q` varbinary(256) NOT NULL,
28+
`T` bigint(20) NOT NULL,
29+
`V` varbinary(1024) DEFAULT NULL,
30+
`TTL` bigint(20) DEFAULT NULL,
31+
PRIMARY KEY (`K`, `Q`, `T`)
32+
);
33+
2534
CREATE TABLE `test_t$partitionFamily1` (
2635
`K` varbinary(1024) NOT NULL,
2736
`Q` varbinary(256) NOT NULL,

0 commit comments

Comments
 (0)