Skip to content

Commit fb5af0e

Browse files
committed
ClientAsyncStreamScanner
1 parent bb306d7 commit fb5af0e

File tree

3 files changed

+58
-25
lines changed

3 files changed

+58
-25
lines changed

src/main/java/com/alipay/oceanbase/hbase/OHTable.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -789,6 +789,11 @@ public ResultScanner call() throws IOException {
789789
ObTableQueryAsyncRequest request;
790790
ObTableQuery obTableQuery;
791791
ObHTableFilter filter;
792+
Boolean async = scan.isAsyncPrefetch();
793+
if (async == null) {
794+
async = configuration.getBoolean(
795+
Scan.HBASE_CLIENT_SCANNER_ASYNC_PREFETCH, Scan.DEFAULT_HBASE_CLIENT_SCANNER_ASYNC_PREFETCH);
796+
}
792797
try {
793798
if (scan.getFamilyMap().keySet() == null
794799
|| scan.getFamilyMap().keySet().isEmpty()
@@ -809,9 +814,16 @@ public ResultScanner call() throws IOException {
809814
getTargetTableName(tableNameString));
810815
clientQueryAsyncStreamResult = (ObTableClientQueryAsyncStreamResult) obTableClient
811816
.execute(request);
812-
if (scan.isAsyncPrefetch()) {
817+
if (async) {
818+
long maxScannerResultSize;
819+
if (scan.getMaxResultSize() > 0) {
820+
maxScannerResultSize = scan.getMaxResultSize();
821+
} else {
822+
maxScannerResultSize = conf.getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
823+
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
824+
}
813825
return new ClientAsyncStreamScanner(clientQueryAsyncStreamResult,
814-
tableNameString, family, true, scan.getMaxResultSize());
826+
tableNameString, family, true, maxScannerResultSize);
815827
} else {
816828
return new ClientStreamScanner(clientQueryAsyncStreamResult,
817829
tableNameString, family, true);
@@ -841,9 +853,16 @@ public ResultScanner call() throws IOException {
841853
configuration));
842854
clientQueryAsyncStreamResult = (ObTableClientQueryAsyncStreamResult) obTableClient
843855
.execute(request);
844-
if (scan.isAsyncPrefetch()) {
856+
if (async) {
857+
long maxScannerResultSize;
858+
if (scan.getMaxResultSize() > 0) {
859+
maxScannerResultSize = scan.getMaxResultSize();
860+
} else {
861+
maxScannerResultSize = conf.getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
862+
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
863+
}
845864
return new ClientAsyncStreamScanner(clientQueryAsyncStreamResult,
846-
tableNameString, family, false, scan.getMaxResultSize());
865+
tableNameString, family, false, maxScannerResultSize);
847866
} else {
848867
return new ClientStreamScanner(clientQueryAsyncStreamResult,
849868
tableNameString, family, false);

src/main/java/com/alipay/oceanbase/hbase/result/ClientAsyncStreamScanner.java

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ public class ClientAsyncStreamScanner extends ClientStreamScanner {
3838
private Thread prefetcher;
3939
// used for testing
4040
private Consumer<Boolean> prefetchListener = null;
41+
private boolean streamNext = true;
4142

4243
private final Lock lock = new ReentrantLock();
4344
private final Condition notEmpty = lock.newCondition();
@@ -47,14 +48,12 @@ public ClientAsyncStreamScanner(ObTableClientQueryAsyncStreamResult streamResult
4748
super(streamResult, tableName, family, isTableGroup);
4849
this.maxResultSize = maxResultSize;
4950
initCache();
50-
loadCache();
5151
}
5252

5353
public ClientAsyncStreamScanner(ObTableClientQueryStreamResult streamResult, String tableName, byte[] family, boolean isTableGroup, long maxResultSize) throws Exception {
5454
super(streamResult, tableName, family, isTableGroup);
5555
this.maxResultSize = maxResultSize;
5656
initCache();
57-
loadCache();
5857
}
5958

6059
@VisibleForTesting
@@ -74,6 +73,7 @@ private void initCache() {
7473

7574
private void loadCache() throws Exception {
7675
if (streamResult.getRowIndex() == -1 && !streamResult.next()) {
76+
streamNext = false;
7777
return;
7878
}
7979

@@ -82,15 +82,7 @@ private void loadCache() throws Exception {
8282
try {
8383
checkStatus();
8484

85-
List<ObObj> startRow;
86-
87-
if (streamResult.getRowIndex() != -1) {
88-
startRow = streamResult.getRow();
89-
} else if (streamResult.next()) {
90-
startRow = streamResult.getRow();
91-
} else {
92-
return;
93-
}
85+
List<ObObj> startRow = streamResult.getRow();
9486

9587
byte[][] familyAndQualifier = new byte[2][];
9688
if (this.isTableGroup) {
@@ -111,7 +103,7 @@ private void loadCache() throws Exception {
111103
List<Cell> keyValues = new ArrayList<>();
112104
keyValues.add(startKeyValue);
113105
addSize = 0;
114-
while (streamResult.next()) {
106+
while ((streamNext = streamResult.next())) {
115107
List<ObObj> row = streamResult.getRow();
116108
if (this.isTableGroup) {
117109
// split family and qualifier
@@ -149,7 +141,7 @@ private void loadCache() throws Exception {
149141
public Result next() throws IOException {
150142
try {
151143
lock.lock();
152-
while (cache.isEmpty()) {
144+
while (cache.isEmpty() && streamNext) {
153145
handleException();
154146
if (this.closed) {
155147
return null;

src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5631,14 +5631,28 @@ public void testScannerMultiVersion() throws Exception {
56315631
}
56325632

56335633
@Test
5634-
public void testAsyncPrefetchScanner() throws IOException {
5634+
public void testAsyncPrefetchScanner1() throws IOException {
56355635
testAsyncPrefetchScannerInner(40, 40, null);
5636+
}
5637+
@Test
5638+
public void testAsyncPrefetchScanner2() throws IOException {
5639+
testAsyncPrefetchScannerInner(4000, 3, null);
5640+
}
5641+
@Test
5642+
public void testAsyncPrefetchScanner3() throws IOException {
5643+
testAsyncPrefetchScannerInner(3, 4000, null);
5644+
}
5645+
@Test
5646+
public void testAsyncPrefetchScanner4() throws IOException {
56365647
testAsyncPrefetchScannerInner(40, 40, (b) -> {
56375648
try {
56385649
TimeUnit.MILLISECONDS.sleep(500);
56395650
} catch (InterruptedException ignored) {
56405651
}
56415652
});
5653+
}
5654+
@Test
5655+
public void testAsyncPrefetchScanner5() throws IOException {
56425656
testAsyncPrefetchScannerInner(40, 40, (b) -> {
56435657
System.out.println("prefetch status: " + b);
56445658
});
@@ -5650,17 +5664,25 @@ public void testAsyncPrefetchScannerInner(int row_count, int column_count, Consu
56505664
String value = "value";
56515665
String family = "family1";
56525666
Put put;
5667+
List<Put> puts = new ArrayList<>();
56535668
for (int i = 0; i < row_count; i++) {
5654-
String k = key + i;
5669+
String k = key + String.format("%05d", i);
56555670
for (int j = 0; j < column_count; j++) {
56565671
put = new Put(k.getBytes());
5657-
put.addColumn(family.getBytes(), Bytes.toBytes(column + j), (value + j).getBytes());
5658-
hTable.put(put);
5672+
put.addColumn(family.getBytes(), Bytes.toBytes(column + String.format("%05d", j)), (value + String.format("%05d", j)).getBytes());
5673+
puts.add(put);
5674+
if (puts.size() > 1000) {
5675+
hTable.put(puts);
5676+
puts.clear();
5677+
}
56595678
}
56605679
}
5680+
hTable.put(puts);
5681+
puts.clear();
56615682

56625683
Scan scan = new Scan();
56635684
scan.readVersions(10);
5685+
scan.addFamily(family.getBytes());
56645686
scan.setAsyncPrefetch(true);
56655687
ResultScanner scanner = hTable.getScanner(scan);
56665688
assertTrue(scanner instanceof ClientAsyncStreamScanner);
@@ -5669,10 +5691,10 @@ public void testAsyncPrefetchScannerInner(int row_count, int column_count, Consu
56695691
int count = 0;
56705692
for (Result res: scanner) {
56715693
for (Cell cell: res.rawCells()) {
5672-
int rowId = count / row_count;
5673-
int columnId = count % row_count;
5674-
Assert.assertEquals((key + rowId).getBytes(), CellUtil.cloneRow(cell));
5675-
Assert.assertEquals((column + columnId).getBytes(), CellUtil.cloneQualifier(cell));
5694+
int rowId = count / column_count;
5695+
int columnId = count % column_count;
5696+
Assert.assertEquals(key + String.format("%05d", rowId), Bytes.toString(CellUtil.cloneRow(cell)));
5697+
Assert.assertEquals(column + String.format("%05d", columnId), Bytes.toString(CellUtil.cloneQualifier(cell)));
56765698
count ++;
56775699
}
56785700
}

0 commit comments

Comments
 (0)