Skip to content

Commit c18217f

Browse files
committed
ClientAsyncStreamScanner
1 parent 8751fdc commit c18217f

File tree

4 files changed

+324
-11
lines changed

4 files changed

+324
-11
lines changed

src/main/java/com/alipay/oceanbase/hbase/OHTable.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.alipay.oceanbase.hbase.exception.OperationTimeoutException;
2222
import com.alipay.oceanbase.hbase.execute.ServerCallable;
2323
import com.alipay.oceanbase.hbase.filter.HBaseFilterUtils;
24+
import com.alipay.oceanbase.hbase.result.ClientAsyncStreamScanner;
2425
import com.alipay.oceanbase.hbase.result.ClientStreamScanner;
2526
import com.alipay.oceanbase.hbase.util.*;
2627
import com.alipay.oceanbase.rpc.ObTableClient;
@@ -731,8 +732,13 @@ public ResultScanner call() throws IOException {
731732
getTargetTableName(tableNameString));
732733
clientQueryAsyncStreamResult = (ObTableClientQueryAsyncStreamResult) obTableClient
733734
.execute(request);
734-
return new ClientStreamScanner(clientQueryAsyncStreamResult,
735-
tableNameString, family, true);
735+
if (scan.isAsyncPrefetch()) {
736+
return new ClientAsyncStreamScanner(clientQueryAsyncStreamResult,
737+
tableNameString, family, true, scan.getMaxResultSize());
738+
} else {
739+
return new ClientStreamScanner(clientQueryAsyncStreamResult,
740+
tableNameString, family, true);
741+
}
736742
} else {
737743
for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap()
738744
.entrySet()) {
@@ -758,8 +764,13 @@ public ResultScanner call() throws IOException {
758764
configuration));
759765
clientQueryAsyncStreamResult = (ObTableClientQueryAsyncStreamResult) obTableClient
760766
.execute(request);
761-
return new ClientStreamScanner(clientQueryAsyncStreamResult,
762-
tableNameString, family, false);
767+
if (scan.isAsyncPrefetch()) {
768+
return new ClientAsyncStreamScanner(clientQueryAsyncStreamResult,
769+
tableNameString, family, false, scan.getMaxResultSize());
770+
} else {
771+
return new ClientStreamScanner(clientQueryAsyncStreamResult,
772+
tableNameString, family, false);
773+
}
763774
}
764775
}
765776
} catch (Exception e) {
Lines changed: 251 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,251 @@
1+
package com.alipay.oceanbase.hbase.result;
2+
3+
import com.alipay.oceanbase.hbase.util.OHBaseFuncUtils;
4+
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj;
5+
import com.alipay.oceanbase.rpc.stream.ObTableClientQueryAsyncStreamResult;
6+
import com.alipay.oceanbase.rpc.stream.ObTableClientQueryStreamResult;
7+
import org.apache.hadoop.hbase.Cell;
8+
import org.apache.hadoop.hbase.KeyValue;
9+
import org.apache.hadoop.hbase.PrivateCellUtil;
10+
import org.apache.hadoop.hbase.client.Result;
11+
import org.apache.hadoop.hbase.util.Threads;
12+
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
13+
14+
import java.io.IOException;
15+
import java.io.InterruptedIOException;
16+
import java.util.ArrayList;
17+
import java.util.Arrays;
18+
import java.util.List;
19+
import java.util.Queue;
20+
import java.util.concurrent.ConcurrentLinkedQueue;
21+
import java.util.concurrent.atomic.AtomicLong;
22+
import java.util.concurrent.locks.Condition;
23+
import java.util.concurrent.locks.Lock;
24+
import java.util.concurrent.locks.ReentrantLock;
25+
import java.util.function.Consumer;
26+
import java.util.concurrent.LinkedBlockingQueue;
27+
28+
import static com.alipay.oceanbase.hbase.util.TableHBaseLoggerFactory.LCD;
29+
30+
public class ClientAsyncStreamScanner extends ClientStreamScanner {
31+
private Queue<Result> cache;
32+
private long maxCacheSize;
33+
private AtomicLong cacheSizeInBytes;
34+
long maxResultSize;
35+
// exception queue (from prefetch to main scan execution)
36+
private Queue<Exception> exceptionsQueue;
37+
// prefetch thread to be executed asynchronously
38+
private Thread prefetcher;
39+
// used for testing
40+
private Consumer<Boolean> prefetchListener = null;
41+
42+
private final Lock lock = new ReentrantLock();
43+
private final Condition notEmpty = lock.newCondition();
44+
private final Condition notFull = lock.newCondition();
45+
46+
public ClientAsyncStreamScanner(ObTableClientQueryAsyncStreamResult streamResult, String tableName, byte[] family, boolean isTableGroup, long maxResultSize) throws Exception {
47+
super(streamResult, tableName, family, isTableGroup);
48+
this.maxResultSize = maxResultSize;
49+
initCache();
50+
loadCache();
51+
}
52+
53+
public ClientAsyncStreamScanner(ObTableClientQueryStreamResult streamResult, String tableName, byte[] family, boolean isTableGroup, long maxResultSize) throws Exception {
54+
super(streamResult, tableName, family, isTableGroup);
55+
this.maxResultSize = maxResultSize;
56+
initCache();
57+
loadCache();
58+
}
59+
60+
@VisibleForTesting
61+
public void setPrefetchListener(Consumer<Boolean> prefetchListener) {
62+
this.prefetchListener = prefetchListener;
63+
}
64+
65+
private void initCache() {
66+
// concurrent cache
67+
maxCacheSize = maxResultSize > Long.MAX_VALUE / 2 ? maxResultSize : maxResultSize * 2;
68+
cache = new LinkedBlockingQueue<>();
69+
cacheSizeInBytes = new AtomicLong(0);
70+
exceptionsQueue = new ConcurrentLinkedQueue<>();
71+
prefetcher = new Thread(new PrefetchRunnable());
72+
Threads.setDaemonThreadRunning(prefetcher, tableName + ".asyncPrefetcher");
73+
}
74+
75+
private void loadCache() throws Exception {
76+
if (streamResult.getRowIndex() == -1 && !streamResult.next()) {
77+
return;
78+
}
79+
80+
long addSize = 0;
81+
while (!streamResult.getCacheRows().isEmpty()) {
82+
try {
83+
checkStatus();
84+
85+
List<ObObj> startRow;
86+
87+
if (streamResult.getRowIndex() != -1) {
88+
startRow = streamResult.getRow();
89+
} else if (streamResult.next()) {
90+
startRow = streamResult.getRow();
91+
} else {
92+
return;
93+
}
94+
95+
byte[][] familyAndQualifier = new byte[2][];
96+
if (this.isTableGroup) {
97+
// split family and qualifier
98+
familyAndQualifier = OHBaseFuncUtils.extractFamilyFromQualifier((byte[]) startRow
99+
.get(1).getValue());
100+
this.family = familyAndQualifier[0];
101+
} else {
102+
familyAndQualifier[1] = (byte[]) startRow.get(1).getValue();
103+
}
104+
105+
byte[] sk = (byte[]) startRow.get(0).getValue();
106+
byte[] sq = familyAndQualifier[1];
107+
long st = (Long) startRow.get(2).getValue();
108+
byte[] sv = (byte[]) startRow.get(3).getValue();
109+
110+
KeyValue startKeyValue = new KeyValue(sk, family, sq, st, sv);
111+
List<Cell> keyValues = new ArrayList<>();
112+
keyValues.add(startKeyValue);
113+
addSize = 0;
114+
while (streamResult.next()) {
115+
List<ObObj> row = streamResult.getRow();
116+
if (this.isTableGroup) {
117+
// split family and qualifier
118+
familyAndQualifier = OHBaseFuncUtils.extractFamilyFromQualifier((byte[]) row
119+
.get(1).getValue());
120+
this.family = familyAndQualifier[0];
121+
} else {
122+
familyAndQualifier[1] = (byte[]) row.get(1).getValue();
123+
}
124+
byte[] k = (byte[]) row.get(0).getValue();
125+
byte[] q = familyAndQualifier[1];
126+
long t = (Long) row.get(2).getValue();
127+
byte[] v = (byte[]) row.get(3).getValue();
128+
129+
if (Arrays.equals(sk, k)) {
130+
// when rowKey is equal to the previous rowKey ,merge the result into the same result
131+
KeyValue kv = new KeyValue(k, family, q, t, v);
132+
addSize += PrivateCellUtil.estimatedSizeOfCell(kv);
133+
keyValues.add(kv);
134+
} else {
135+
break;
136+
}
137+
}
138+
cache.add(Result.create(keyValues));
139+
addEstimatedSize(addSize);
140+
} catch (Exception e) {
141+
logger.error(LCD.convert("01-00000"), streamResult.getTableName(), e);
142+
throw new IOException(String.format("get table %s stream next result error ",
143+
streamResult.getTableName()), e);
144+
}
145+
}
146+
}
147+
148+
@Override
149+
public Result next() throws IOException {
150+
try {
151+
lock.lock();
152+
while (cache.isEmpty()) {
153+
handleException();
154+
if (this.closed) {
155+
return null;
156+
}
157+
try {
158+
notEmpty.await();
159+
} catch (InterruptedException e) {
160+
throw new InterruptedIOException("Interrupted when wait to load cache");
161+
}
162+
}
163+
164+
Result result = pollCache();
165+
if (prefetchCondition()) {
166+
notFull.signalAll();
167+
}
168+
return result;
169+
} finally {
170+
lock.unlock();
171+
handleException();
172+
}
173+
}
174+
175+
@Override
176+
public void close() {
177+
try {
178+
lock.lock();
179+
super.close();
180+
closed = true;
181+
notFull.signalAll();
182+
notEmpty.signalAll();
183+
} finally {
184+
lock.unlock();
185+
}
186+
}
187+
188+
private void addEstimatedSize(long estimatedSize) {
189+
cacheSizeInBytes.addAndGet(estimatedSize);
190+
}
191+
192+
private void handleException() throws IOException {
193+
//The prefetch task running in the background puts any exception it
194+
//catches into this exception queue.
195+
// Rethrow the exception so the application can handle it.
196+
while (!exceptionsQueue.isEmpty()) {
197+
Exception first = exceptionsQueue.peek();
198+
first.printStackTrace();
199+
if (first instanceof IOException) {
200+
throw (IOException) first;
201+
}
202+
throw (RuntimeException) first;
203+
}
204+
}
205+
206+
private boolean prefetchCondition() {
207+
return cacheSizeInBytes.get() < maxCacheSize / 2;
208+
}
209+
210+
private long estimatedResultSize(Result res) {
211+
long result_size = 0;
212+
for (Cell cell : res.rawCells()) {
213+
result_size += PrivateCellUtil.estimatedSizeOfCell(cell);
214+
}
215+
return result_size;
216+
}
217+
218+
private Result pollCache() {
219+
Result res = cache.poll();
220+
if (null != res) {
221+
long estimatedSize = estimatedResultSize(res);
222+
addEstimatedSize(-estimatedSize);
223+
}
224+
return res;
225+
}
226+
227+
private class PrefetchRunnable implements Runnable {
228+
@Override
229+
public void run() {
230+
while (!closed) {
231+
boolean succeed = false;
232+
try {
233+
lock.lock();
234+
while (!prefetchCondition()) {
235+
notFull.await();
236+
}
237+
loadCache();
238+
succeed = true;
239+
} catch (Exception e) {
240+
exceptionsQueue.add(e);
241+
} finally {
242+
notEmpty.signalAll();
243+
lock.unlock();
244+
if (prefetchListener != null) {
245+
prefetchListener.accept(succeed);
246+
}
247+
}
248+
}
249+
}
250+
}
251+
}

src/main/java/com/alipay/oceanbase/hbase/result/ClientStreamScanner.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,18 +39,18 @@
3939
@InterfaceAudience.Private
4040
public class ClientStreamScanner extends AbstractClientScanner {
4141

42-
private static final Logger logger = TableHBaseLoggerFactory
42+
protected static final Logger logger = TableHBaseLoggerFactory
4343
.getLogger(ClientStreamScanner.class);
4444

45-
private final AbstractQueryStreamResult streamResult;
45+
protected final AbstractQueryStreamResult streamResult;
4646

47-
private final String tableName;
47+
protected final String tableName;
4848

49-
private byte[] family;
49+
protected byte[] family;
5050

51-
private boolean closed = false;
51+
protected boolean closed = false;
5252

53-
private boolean isTableGroup = false;
53+
protected boolean isTableGroup = false;
5454

5555
public ClientStreamScanner(ObTableClientQueryStreamResult streamResult, String tableName,
5656
byte[] family, boolean isTableGroup) {
@@ -151,7 +151,7 @@ public boolean renewLease() {
151151
}
152152
}
153153

154-
private void checkStatus() throws IllegalStateException {
154+
void checkStatus() throws IllegalStateException {
155155
if (closed) {
156156
throw new IllegalStateException("table " + tableName + " family "
157157
+ Bytes.toString(family) + " scanner is closed");

src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package com.alipay.oceanbase.hbase;
1919

2020
import com.alipay.oceanbase.hbase.exception.FeatureNotSupportedException;
21+
import com.alipay.oceanbase.hbase.result.ClientAsyncStreamScanner;
2122
import org.apache.hadoop.hbase.*;
2223
import org.apache.hadoop.hbase.client.*;
2324
import org.apache.hadoop.hbase.filter.*;
@@ -35,6 +36,8 @@
3536
import java.nio.ByteBuffer;
3637
import java.nio.charset.StandardCharsets;
3738
import java.util.*;
39+
import java.util.concurrent.TimeUnit;
40+
import java.util.function.Consumer;
3841

3942
import static org.apache.hadoop.hbase.filter.FilterList.Operator.MUST_PASS_ALL;
4043
import static org.apache.hadoop.hbase.filter.FilterList.Operator.MUST_PASS_ONE;
@@ -5325,6 +5328,54 @@ public void testScannerMultiVersion() throws Exception {
53255328
Assert.assertEquals(2, resultList.size());
53265329
}
53275330

5331+
@Test
5332+
public void testAsyncPrefetchScanner() throws IOException {
5333+
testAsyncPrefetchScannerInner(null);
5334+
testAsyncPrefetchScannerInner((b) -> {
5335+
try {
5336+
TimeUnit.MILLISECONDS.sleep(500);
5337+
} catch (InterruptedException ignored) {
5338+
}
5339+
});
5340+
}
5341+
5342+
public void testAsyncPrefetchScannerInner(Consumer<Boolean> listener) throws IOException {
5343+
String key = "async_scanner";
5344+
String column = "column";
5345+
String value = "value";
5346+
String family = "family1";
5347+
Put put;
5348+
int row_count = 40;
5349+
int column_count = 40;
5350+
for (int i = 0; i < row_count; i++) {
5351+
String k = key + i;
5352+
for (int j = 0; j < column_count; j++) {
5353+
put = new Put(k.getBytes());
5354+
put.addColumn(family.getBytes(), Bytes.toBytes(column + j), (value + j).getBytes());
5355+
hTable.put(put);
5356+
}
5357+
}
5358+
5359+
Scan scan = new Scan();
5360+
scan.readVersions(10);
5361+
scan.setAsyncPrefetch(true);
5362+
ResultScanner scanner = hTable.getScanner(scan);
5363+
assertTrue(scanner instanceof ClientAsyncStreamScanner);
5364+
((ClientAsyncStreamScanner) scanner).setPrefetchListener(listener);
5365+
5366+
int count = 0;
5367+
for (Result res: scanner) {
5368+
for (Cell cell: res.rawCells()) {
5369+
int rowId = count / row_count;
5370+
int columnId = count % row_count;
5371+
Assert.assertEquals((key + rowId).getBytes(), CellUtil.cloneRow(cell));
5372+
Assert.assertEquals((column + columnId).getBytes(), CellUtil.cloneQualifier(cell));
5373+
count ++;
5374+
}
5375+
}
5376+
assertEquals(row_count * column_count, count);
5377+
}
5378+
53285379
@Test
53295380
public void testPutColumnFamilyNull() throws Exception {
53305381
Put put1 = new Put(("key_c_f").getBytes());

0 commit comments

Comments
 (0)