Skip to content

ClientAsyncStreamScanner #104

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: hbase_2.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 34 additions & 4 deletions src/main/java/com/alipay/oceanbase/hbase/OHTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.alipay.oceanbase.hbase.exception.OperationTimeoutException;
import com.alipay.oceanbase.hbase.execute.ServerCallable;
import com.alipay.oceanbase.hbase.filter.HBaseFilterUtils;
import com.alipay.oceanbase.hbase.result.ClientAsyncStreamScanner;
import com.alipay.oceanbase.hbase.result.ClientStreamScanner;
import com.alipay.oceanbase.hbase.util.*;
import com.alipay.oceanbase.rpc.ObTableClient;
Expand Down Expand Up @@ -788,6 +789,11 @@ public ResultScanner call() throws IOException {
ObTableQueryAsyncRequest request;
ObTableQuery obTableQuery;
ObHTableFilter filter;
Boolean async = scan.isAsyncPrefetch();
if (async == null) {
async = configuration.getBoolean(
Scan.HBASE_CLIENT_SCANNER_ASYNC_PREFETCH, Scan.DEFAULT_HBASE_CLIENT_SCANNER_ASYNC_PREFETCH);
}
try {
if (scan.getFamilyMap().keySet() == null
|| scan.getFamilyMap().keySet().isEmpty()
Expand All @@ -808,8 +814,20 @@ public ResultScanner call() throws IOException {
getTargetTableName(tableNameString));
clientQueryAsyncStreamResult = (ObTableClientQueryAsyncStreamResult) obTableClient
.execute(request);
return new ClientStreamScanner(clientQueryAsyncStreamResult,
tableNameString, family, true);
if (async) {
long maxScannerResultSize;
if (scan.getMaxResultSize() > 0) {
maxScannerResultSize = scan.getMaxResultSize();
} else {
maxScannerResultSize = conf.getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
}
return new ClientAsyncStreamScanner(clientQueryAsyncStreamResult,
tableNameString, family, true, maxScannerResultSize);
} else {
return new ClientStreamScanner(clientQueryAsyncStreamResult,
tableNameString, family, true);
}
} else {
for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap()
.entrySet()) {
Expand All @@ -835,8 +853,20 @@ public ResultScanner call() throws IOException {
configuration));
clientQueryAsyncStreamResult = (ObTableClientQueryAsyncStreamResult) obTableClient
.execute(request);
return new ClientStreamScanner(clientQueryAsyncStreamResult,
tableNameString, family, false);
if (async) {
long maxScannerResultSize;
if (scan.getMaxResultSize() > 0) {
maxScannerResultSize = scan.getMaxResultSize();
} else {
maxScannerResultSize = conf.getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
}
return new ClientAsyncStreamScanner(clientQueryAsyncStreamResult,
tableNameString, family, false, maxScannerResultSize);
} else {
return new ClientStreamScanner(clientQueryAsyncStreamResult,
tableNameString, family, false);
}
}
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
package com.alipay.oceanbase.hbase.result;

import com.alipay.oceanbase.hbase.util.OHBaseFuncUtils;
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj;
import com.alipay.oceanbase.rpc.stream.ObTableClientQueryAsyncStreamResult;
import com.alipay.oceanbase.rpc.stream.ObTableClientQueryStreamResult;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.concurrent.LinkedBlockingQueue;

import static com.alipay.oceanbase.hbase.util.TableHBaseLoggerFactory.LCD;

public class ClientAsyncStreamScanner extends ClientStreamScanner {
private Queue<Result> cache;
private long maxCacheSize;
private AtomicLong cacheSizeInBytes;
long maxResultSize;
// exception queue (from prefetch to main scan execution)
private Queue<Exception> exceptionsQueue;
// prefetch thread to be executed asynchronously
private Thread prefetcher;
// used for testing
private Consumer<Boolean> prefetchListener = null;
private boolean streamNext = true;

private final Lock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();

public ClientAsyncStreamScanner(ObTableClientQueryAsyncStreamResult streamResult, String tableName, byte[] family, boolean isTableGroup, long maxResultSize) throws Exception {
super(streamResult, tableName, family, isTableGroup);
this.maxResultSize = maxResultSize;
initCache();
}

public ClientAsyncStreamScanner(ObTableClientQueryStreamResult streamResult, String tableName, byte[] family, boolean isTableGroup, long maxResultSize) throws Exception {
super(streamResult, tableName, family, isTableGroup);
this.maxResultSize = maxResultSize;
initCache();
}

@VisibleForTesting
public void setPrefetchListener(Consumer<Boolean> prefetchListener) {
this.prefetchListener = prefetchListener;
}

private void initCache() {
// concurrent cache
maxCacheSize = maxResultSize > Long.MAX_VALUE / 2 ? maxResultSize : maxResultSize * 2;
cache = new LinkedBlockingQueue<>();
cacheSizeInBytes = new AtomicLong(0);
exceptionsQueue = new ConcurrentLinkedQueue<>();
prefetcher = new Thread(new PrefetchRunnable());
Threads.setDaemonThreadRunning(prefetcher, tableName + ".asyncPrefetcher");
}

private void loadCache() throws Exception {
if (streamResult.getRowIndex() == -1 && !streamResult.next()) {
streamNext = false;
return;
}

long addSize = 0;
while (!streamResult.getCacheRows().isEmpty()) {
try {
checkStatus();

List<ObObj> startRow = streamResult.getRow();

byte[][] familyAndQualifier = new byte[2][];
if (this.isTableGroup) {
// split family and qualifier
familyAndQualifier = OHBaseFuncUtils.extractFamilyFromQualifier((byte[]) startRow
.get(1).getValue());
this.family = familyAndQualifier[0];
} else {
familyAndQualifier[1] = (byte[]) startRow.get(1).getValue();
}

byte[] sk = (byte[]) startRow.get(0).getValue();
byte[] sq = familyAndQualifier[1];
long st = (Long) startRow.get(2).getValue();
byte[] sv = (byte[]) startRow.get(3).getValue();

KeyValue startKeyValue = new KeyValue(sk, family, sq, st, sv);
List<Cell> keyValues = new ArrayList<>();
keyValues.add(startKeyValue);
addSize = 0;
while ((streamNext = streamResult.next())) {
List<ObObj> row = streamResult.getRow();
if (this.isTableGroup) {
// split family and qualifier
familyAndQualifier = OHBaseFuncUtils.extractFamilyFromQualifier((byte[]) row
.get(1).getValue());
this.family = familyAndQualifier[0];
} else {
familyAndQualifier[1] = (byte[]) row.get(1).getValue();
}
byte[] k = (byte[]) row.get(0).getValue();
byte[] q = familyAndQualifier[1];
long t = (Long) row.get(2).getValue();
byte[] v = (byte[]) row.get(3).getValue();

if (Arrays.equals(sk, k)) {
// when rowKey is equal to the previous rowKey ,merge the result into the same result
KeyValue kv = new KeyValue(k, family, q, t, v);
addSize += PrivateCellUtil.estimatedSizeOfCell(kv);
keyValues.add(kv);
} else {
break;
}
}
cache.add(Result.create(keyValues));
addEstimatedSize(addSize);
} catch (Exception e) {
logger.error(LCD.convert("01-00000"), streamResult.getTableName(), e);
throw new IOException(String.format("get table %s stream next result error ",
streamResult.getTableName()), e);
}
}
}

@Override
public Result next() throws IOException {
try {
lock.lock();
while (cache.isEmpty() && streamNext) {
handleException();
if (this.closed) {
return null;
}
try {
notEmpty.await();
} catch (InterruptedException e) {
throw new InterruptedIOException("Interrupted when wait to load cache");
}
}

Result result = pollCache();
if (prefetchCondition()) {
notFull.signalAll();
}
return result;
} finally {
lock.unlock();
handleException();
}
}

@Override
public void close() {
try {
lock.lock();
super.close();
closed = true;
notFull.signalAll();
notEmpty.signalAll();
} finally {
lock.unlock();
}
}

private void addEstimatedSize(long estimatedSize) {
cacheSizeInBytes.addAndGet(estimatedSize);
}

private void handleException() throws IOException {
//The prefetch task running in the background puts any exception it
//catches into this exception queue.
// Rethrow the exception so the application can handle it.
while (!exceptionsQueue.isEmpty()) {
Exception first = exceptionsQueue.peek();
first.printStackTrace();
if (first instanceof IOException) {
throw (IOException) first;
}
throw (RuntimeException) first;
}
}

private boolean prefetchCondition() {
return cacheSizeInBytes.get() < maxCacheSize / 2;
}

private long estimatedResultSize(Result res) {
long result_size = 0;
for (Cell cell : res.rawCells()) {
result_size += PrivateCellUtil.estimatedSizeOfCell(cell);
}
return result_size;
}

private Result pollCache() {
Result res = cache.poll();
if (null != res) {
long estimatedSize = estimatedResultSize(res);
addEstimatedSize(-estimatedSize);
}
return res;
}

private class PrefetchRunnable implements Runnable {
@Override
public void run() {
while (!closed) {
boolean succeed = false;
try {
lock.lock();
while (!prefetchCondition()) {
notFull.await();
}
loadCache();
succeed = true;
} catch (Exception e) {
exceptionsQueue.add(e);
} finally {
notEmpty.signalAll();
lock.unlock();
if (prefetchListener != null) {
prefetchListener.accept(succeed);
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,18 @@
@InterfaceAudience.Private
public class ClientStreamScanner extends AbstractClientScanner {

private static final Logger logger = TableHBaseLoggerFactory
protected static final Logger logger = TableHBaseLoggerFactory
.getLogger(ClientStreamScanner.class);

private final AbstractQueryStreamResult streamResult;
protected final AbstractQueryStreamResult streamResult;

private final String tableName;
protected final String tableName;

private byte[] family;
protected byte[] family;

private boolean closed = false;
protected boolean closed = false;

private boolean isTableGroup = false;
protected boolean isTableGroup = false;

public ClientStreamScanner(ObTableClientQueryStreamResult streamResult, String tableName,
byte[] family, boolean isTableGroup) {
Expand Down Expand Up @@ -151,7 +151,7 @@ public boolean renewLease() {
}
}

private void checkStatus() throws IllegalStateException {
void checkStatus() throws IllegalStateException {
if (closed) {
throw new IllegalStateException("table " + tableName + " family "
+ Bytes.toString(family) + " scanner is closed");
Expand Down
Loading