Skip to content

Commit 65cc24a

Browse files
committed
add scan lease test (#53)
1 parent da500ea commit 65cc24a

File tree

5 files changed

+154
-52
lines changed

5 files changed

+154
-52
lines changed

src/main/java/com/alipay/oceanbase/hbase/OHTable.java

Lines changed: 36 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -479,7 +479,7 @@ public Result call() throws IOException {
479479
filter = buildObHTableFilter(get.getFilter(), get.getTimeRange(),
480480
get.getMaxVersions(), null);
481481
obTableQuery = buildObTableQuery(filter, get.getRow(), true, get.getRow(),
482-
true, -1);
482+
true);
483483
request = buildObTableQueryRequest(obTableQuery,
484484
getTargetTableName(tableNameString));
485485

@@ -489,14 +489,12 @@ public Result call() throws IOException {
489489
} else {
490490
for (Map.Entry<byte[], NavigableSet<byte[]>> entry : get.getFamilyMap()
491491
.entrySet()) {
492-
493492
family = entry.getKey();
494-
495493
filter = buildObHTableFilter(get.getFilter(), get.getTimeRange(),
496494
get.getMaxVersions(), entry.getValue());
497495

498496
obTableQuery = buildObTableQuery(filter, get.getRow(), true,
499-
get.getRow(), true, -1);
497+
get.getRow(), true);
500498

501499
request = buildObTableQueryRequest(obTableQuery,
502500
getTargetTableName(tableNameString, Bytes.toString(family)));
@@ -540,8 +538,7 @@ public Result getRowOrBefore(byte[] row, byte[] family) {
540538

541539
@Override
542540
public ResultScanner getScanner(final Scan scan) throws IOException {
543-
544-
if (scan.getFamilyMap().keySet() == null || scan.getFamilyMap().keySet().size() == 0) {
541+
if (scan.getFamilyMap().keySet().isEmpty()) {
545542
// check nothing, use table group;
546543
} else {
547544
checkFamilyViolation(scan.getFamilyMap().keySet());
@@ -558,20 +555,11 @@ public ResultScanner call() throws IOException {
558555
ObTableQuery obTableQuery;
559556
ObHTableFilter filter;
560557
try {
561-
if (scan.getFamilyMap().keySet() == null
562-
|| scan.getFamilyMap().keySet().size() == 0) {
558+
if (scan.getFamilyMap().keySet().isEmpty()) {
563559
filter = buildObHTableFilter(scan.getFilter(), scan.getTimeRange(),
564560
scan.getMaxVersions(), null);
565-
if (scan.isReversed()) {
566-
obTableQuery = buildObTableQuery(filter, scan.getStopRow(), false,
567-
scan.getStartRow(), true, scan.getBatch());
568-
} else {
569-
obTableQuery = buildObTableQuery(filter, scan.getStartRow(), true,
570-
scan.getStopRow(), false, scan.getBatch());
571-
}
572-
if (scan.isReversed()) { // reverse scan 时设置为逆序
573-
obTableQuery.setScanOrder(ObScanOrder.Reverse);
574-
}
561+
obTableQuery = buildObTableQuery(filter, scan);
562+
575563
request = buildObTableQueryAsyncRequest(obTableQuery,
576564
getTargetTableName(tableNameString));
577565
clientQueryAsyncStreamResult = (ObTableClientQueryAsyncStreamResult) obTableClient
@@ -584,19 +572,7 @@ public ResultScanner call() throws IOException {
584572
family = entry.getKey();
585573
filter = buildObHTableFilter(scan.getFilter(), scan.getTimeRange(),
586574
scan.getMaxVersions(), entry.getValue());
587-
if (scan.isReversed()) {
588-
obTableQuery = buildObTableQuery(filter, scan.getStopRow(), false,
589-
scan.getStartRow(), true, scan.getBatch());
590-
} else {
591-
obTableQuery = buildObTableQuery(filter, scan.getStartRow(), true,
592-
scan.getStopRow(), false, scan.getBatch());
593-
}
594-
if (scan.isReversed()) { // reverse scan 时设置为逆序
595-
obTableQuery.setScanOrder(ObScanOrder.Reverse);
596-
}
597-
598-
// no support set maxResultSize.
599-
// obTableQuery.setMaxResultSize(scan.getMaxResultSize());
575+
obTableQuery = buildObTableQuery(filter, scan);
600576

601577
request = buildObTableQueryAsyncRequest(obTableQuery,
602578
getTargetTableName(tableNameString, Bytes.toString(family)));
@@ -824,7 +800,7 @@ private boolean checkAndMutation(byte[] row, byte[] family, byte[] qualifier, Co
824800
keyValueList.addAll(entry.getValue());
825801
}
826802
}
827-
ObTableQuery obTableQuery = buildObTableQuery(filter, row, true, row, true, -1);
803+
ObTableQuery obTableQuery = buildObTableQuery(filter, row, true, row, true);
828804

829805
ObTableBatchOperation batch = buildObTableBatchOperation(keyValueList, false, null);
830806

@@ -862,7 +838,7 @@ public Result append(Append append) throws IOException {
862838
true, qualifiers);
863839
// the later hbase has supported timeRange
864840
ObHTableFilter filter = buildObHTableFilter(null, null, 1, qualifiers);
865-
ObTableQuery obTableQuery = buildObTableQuery(filter, r, true, r, true, -1);
841+
ObTableQuery obTableQuery = buildObTableQuery(filter, r, true, r, true);
866842
ObTableQueryAndMutate queryAndMutate = new ObTableQueryAndMutate();
867843
queryAndMutate.setTableQuery(obTableQuery);
868844
queryAndMutate.setMutations(batchOperation);
@@ -921,7 +897,7 @@ public Result increment(Increment increment) throws IOException {
921897
ObHTableFilter filter = buildObHTableFilter(null, increment.getTimeRange(), 1,
922898
qualifiers);
923899

924-
ObTableQuery obTableQuery = buildObTableQuery(filter, rowKey, true, rowKey, true, -1);
900+
ObTableQuery obTableQuery = buildObTableQuery(filter, rowKey, true, rowKey, true);
925901
ObTableQueryAndMutate queryAndMutate = new ObTableQueryAndMutate();
926902
queryAndMutate.setMutations(batch);
927903
queryAndMutate.setTableQuery(obTableQuery);
@@ -971,7 +947,7 @@ public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, lo
971947

972948
ObHTableFilter filter = buildObHTableFilter(null, null, 1, qualifiers);
973949

974-
ObTableQuery obTableQuery = buildObTableQuery(filter, row, true, row, true, -1);
950+
ObTableQuery obTableQuery = buildObTableQuery(filter, row, true, row, true);
975951
ObTableQueryAndMutate queryAndMutate = new ObTableQueryAndMutate();
976952
queryAndMutate.setMutations(batch);
977953
queryAndMutate.setTableQuery(obTableQuery);
@@ -1373,8 +1349,7 @@ private ObHTableFilter buildObHTableFilter(String filterString, TimeRange timeRa
13731349
}
13741350

13751351
private ObTableQuery buildObTableQuery(ObHTableFilter filter, byte[] start,
1376-
boolean includeStart, byte[] stop, boolean includeStop,
1377-
int batchSize) {
1352+
boolean includeStart, byte[] stop, boolean includeStop) {
13781353
ObNewRange obNewRange = new ObNewRange();
13791354

13801355
if (Arrays.equals(start, HConstants.EMPTY_BYTE_ARRAY)) {
@@ -1394,23 +1369,37 @@ private ObTableQuery buildObTableQuery(ObHTableFilter filter, byte[] start,
13941369
} else {
13951370
obNewRange.setEndKey(ObRowKey.getInstance(stop, ObObj.getMin(), ObObj.getMin()));
13961371
}
1397-
1398-
return buildObTableQuery(filter, obNewRange, batchSize);
1399-
}
1400-
1401-
private ObTableQuery buildObTableQuery(ObHTableFilter filter, ObNewRange obNewRange,
1402-
int batchSize) {
14031372
ObTableQuery obTableQuery = new ObTableQuery();
14041373
obTableQuery.setIndexName("PRIMARY");
14051374
obTableQuery.sethTableFilter(filter);
14061375
for (String column : ALL_COLUMNS) {
14071376
obTableQuery.addSelectColumn(column);
14081377
}
1409-
if (obNewRange != null) {
1410-
obTableQuery.addKeyRange(obNewRange);
1378+
obTableQuery.addKeyRange(obNewRange);
1379+
1380+
return obTableQuery;
1381+
}
1382+
1383+
private ObTableQuery buildObTableQuery(ObHTableFilter filter, final Scan scan) {
1384+
ObTableQuery obTableQuery;
1385+
if (scan.getMaxResultsPerColumnFamily() > 0) {
1386+
filter.setLimitPerRowPerCf(scan.getMaxResultsPerColumnFamily());
1387+
}
1388+
if (scan.getRowOffsetPerColumnFamily() > 0) {
1389+
filter.setOffsetPerRowPerCf(scan.getRowOffsetPerColumnFamily());
1390+
}
1391+
if (scan.isReversed()) {
1392+
obTableQuery = buildObTableQuery(filter, scan.getStopRow(), false, scan.getStartRow(),
1393+
true);
1394+
} else {
1395+
obTableQuery = buildObTableQuery(filter, scan.getStartRow(), true, scan.getStopRow(),
1396+
false);
1397+
}
1398+
if (scan.isReversed()) { // reverse scan 时设置为逆序
1399+
obTableQuery.setScanOrder(ObScanOrder.Reverse);
14111400
}
1412-
if (batchSize > 0) {
1413-
obTableQuery.setBatchSize(batchSize);
1401+
if (scan.getBatch() > 0) {
1402+
obTableQuery.setBatchSize(scan.getBatch());
14141403
}
14151404
return obTableQuery;
14161405
}

src/main/java/com/alipay/oceanbase/hbase/result/ClientStreamScanner.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.AbstractQueryStreamResult;
2525
import com.alipay.oceanbase.rpc.stream.ObTableClientQueryAsyncStreamResult;
2626
import com.alipay.oceanbase.rpc.stream.ObTableClientQueryStreamResult;
27+
import org.apache.hadoop.classification.InterfaceAudience;
2728
import org.apache.hadoop.hbase.KeyValue;
2829
import org.apache.hadoop.hbase.client.AbstractClientScanner;
2930
import org.apache.hadoop.hbase.client.Result;
@@ -36,6 +37,7 @@
3637

3738
import static com.alipay.oceanbase.hbase.util.TableHBaseLoggerFactory.LCD;
3839

40+
@InterfaceAudience.Private
3941
public class ClientStreamScanner extends AbstractClientScanner {
4042

4143
private static final Logger logger = TableHBaseLoggerFactory

src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public class OHConnectionImpl implements Connection {
4040

4141
private static final Marker FATAL = MarkerFactory.getMarker("FATAL");
4242

43-
static final int BUFFERED_PARAM_UNSET = -1;
43+
private static final int BUFFERED_PARAM_UNSET = -1;
4444

4545
private volatile boolean closed;
4646
private volatile boolean aborted;

src/main/resources/hbase-site.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,4 @@
2222
<name>hbase.client.connection.impl</name>
2323
<value>com.alipay.oceanbase.hbase.util.OHConnectionImpl</value>
2424
</property>
25-
</configuration>
25+
</configuration>

src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java

Lines changed: 114 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package com.alipay.oceanbase.hbase;
1919

2020
import com.alipay.oceanbase.hbase.exception.FeatureNotSupportedException;
21-
2221
import org.apache.hadoop.hbase.Cell;
2322
import org.apache.hadoop.hbase.CellUtil;
2423
import org.apache.hadoop.hbase.HConstants;
@@ -43,8 +42,7 @@
4342
import static org.apache.hadoop.hbase.filter.FilterList.Operator.MUST_PASS_ALL;
4443
import static org.apache.hadoop.hbase.filter.FilterList.Operator.MUST_PASS_ONE;
4544
import static org.apache.hadoop.hbase.util.Bytes.toBytes;
46-
import static org.junit.Assert.assertEquals;
47-
import static org.junit.Assert.fail;
45+
import static org.junit.Assert.*;
4846

4947
public abstract class HTableTestBase {
5048

@@ -401,6 +399,8 @@ public void testMultiPartitionPut() throws IOException {
401399
}
402400
}
403401

402+
403+
404404
@Test
405405
public void testMultiPartitionDel() throws IOException {
406406
String[] keys = new String[] { "putKey1", "putKey2", "putKey3", "putKey4", "putKey5",
@@ -1330,6 +1330,87 @@ public void testGetFilter() throws Exception {
13301330
Assert.assertEquals(1, r.raw().length);
13311331
}
13321332

1333+
@Test
1334+
public void testScanSessionClean() throws Exception {
1335+
String key1 = "bKey";
1336+
String key2 = "cKey";
1337+
String key3 = "dKey";
1338+
String key4 = "eKey";
1339+
String key5 = "fKey";
1340+
String column1 = "column1";
1341+
String column2 = "column2";
1342+
String value1 = "value1";
1343+
String family = "family1";
1344+
1345+
// delete previous data
1346+
Delete deleteKey1Family = new Delete(toBytes(key1));
1347+
deleteKey1Family.deleteFamily(toBytes(family));
1348+
Delete deleteKey2Family = new Delete(toBytes(key2));
1349+
deleteKey2Family.deleteFamily(toBytes(family));
1350+
Delete deleteKey3Family = new Delete(toBytes(key3));
1351+
deleteKey3Family.deleteFamily(toBytes(family));
1352+
Delete deleteKey4Family = new Delete(toBytes(key4));
1353+
deleteKey4Family.deleteFamily(toBytes(family));
1354+
Delete deleteKey5Family = new Delete(toBytes(key5));
1355+
deleteKey5Family.deleteFamily(toBytes(family));
1356+
1357+
hTable.delete(deleteKey1Family);
1358+
hTable.delete(deleteKey2Family);
1359+
hTable.delete(deleteKey3Family);
1360+
hTable.delete(deleteKey4Family);
1361+
hTable.delete(deleteKey5Family);
1362+
1363+
Put putKey1Column1Value1 = new Put(toBytes(key1));
1364+
putKey1Column1Value1.add(toBytes(family), toBytes(column1), toBytes(value1));
1365+
1366+
Put putKey2Column2Value1 = new Put(toBytes(key2));
1367+
putKey2Column2Value1.add(toBytes(family), toBytes(column2), toBytes(value1));
1368+
1369+
Put putKey3Column2Value1 = new Put(toBytes(key3));
1370+
putKey3Column2Value1.add(toBytes(family), toBytes(column2), toBytes(value1));
1371+
1372+
Put putKey4Column2Value1 = new Put(toBytes(key4));
1373+
putKey4Column2Value1.add(toBytes(family), toBytes(column2), toBytes(value1));
1374+
1375+
Put putKey5Column2Value1 = new Put(toBytes(key5));
1376+
putKey5Column2Value1.add(toBytes(family), toBytes(column2), toBytes(value1));
1377+
1378+
tryPut(hTable, putKey1Column1Value1);
1379+
tryPut(hTable, putKey1Column1Value1);
1380+
tryPut(hTable, putKey2Column2Value1);
1381+
tryPut(hTable, putKey3Column2Value1);
1382+
tryPut(hTable, putKey4Column2Value1);
1383+
tryPut(hTable, putKey5Column2Value1);
1384+
1385+
Scan scan;
1386+
scan = new Scan();
1387+
scan.addFamily(family.getBytes());
1388+
scan.setMaxVersions(10);
1389+
scan.setBatch(1);
1390+
1391+
ResultScanner scanner = hTable.getScanner(scan);
1392+
scanner.next();
1393+
1394+
// The server defaults to a lease of 60 seconds. Therefore, at 20 seconds,
1395+
// the transaction is checked to ensure it has not rolled back, and the lease is updated.
1396+
// At 55 seconds, the query should still be able to retrieve the data and update the lease.
1397+
// If it exceeds 60 seconds (at 61 seconds), the session is deleted.
1398+
Thread.sleep(20 * 1000);
1399+
scanner.next();
1400+
1401+
Thread.sleep(55 * 1000);
1402+
scanner.next();
1403+
1404+
Thread.sleep(61 * 1000);
1405+
try {
1406+
scanner.next();
1407+
} catch (IOException e) {
1408+
assertTrue(e.getCause().getMessage().contains("OB_HASH_NOT_EXIST"));
1409+
} finally {
1410+
scanner.close();
1411+
}
1412+
}
1413+
13331414
@Test
13341415
public void testScan() throws Exception {
13351416
String key1 = "scanKey1x";
@@ -1480,6 +1561,36 @@ public void testScan() throws Exception {
14801561
Assert.assertEquals(res_count, 7);
14811562
scanner.close();
14821563

1564+
scan = new Scan();
1565+
scan.addFamily(family.getBytes());
1566+
scan.setStartRow("scanKey1x".getBytes());
1567+
scan.setStopRow("scanKey3x".getBytes());
1568+
scan.setMaxVersions(10);
1569+
scan.setMaxResultsPerColumnFamily(1);
1570+
scanner = hTable.getScanner(scan);
1571+
for (Result result : scanner) {
1572+
assertEquals(result.rawCells().length,1);
1573+
}
1574+
scanner.close();
1575+
1576+
scan = new Scan();
1577+
scan.addFamily(family.getBytes());
1578+
scan.setStartRow("scanKey1x".getBytes());
1579+
scan.setStopRow("scanKey3x".getBytes());
1580+
scan.setMaxVersions(10);
1581+
scan.setMaxResultsPerColumnFamily(2);
1582+
scan.setRowOffsetPerColumnFamily(1);
1583+
scanner = hTable.getScanner(scan);
1584+
res_count = 0;
1585+
for (Result result : scanner) {
1586+
for (KeyValue keyValue : result.raw()) {
1587+
Arrays.equals(key2.getBytes(), keyValue.getRow());
1588+
res_count += 1;
1589+
}
1590+
}
1591+
Assert.assertEquals(res_count, 3);
1592+
scanner.close();
1593+
14831594
// scan with prefixFilter
14841595
scan = new Scan();
14851596
scan.addFamily(family.getBytes());

0 commit comments

Comments
 (0)