Skip to content

Commit 28dbab0

Browse files
committed
support allowPatialResult (#56)
1 parent 65cc24a commit 28dbab0

File tree

7 files changed

+444
-118
lines changed

7 files changed

+444
-118
lines changed
Binary file not shown.

pom.xml

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,13 @@
5858
</properties>
5959

6060
<dependencies>
61+
<dependency>
62+
<groupId>com.oceanbase</groupId>
63+
<artifactId>obkv-table-client</artifactId>
64+
<version>1.2.131-SNAPSHOT</version>
65+
<scope>system</scope>
66+
<systemPath>${project.basedir}/obkv-table-client-1.2.131-SNAPSHOT-jar-with-dependencies.jar</systemPath>
67+
</dependency>
6168
<dependency>
6269
<groupId>org.apache.hadoop</groupId>
6370
<artifactId>hadoop-common</artifactId>
@@ -109,11 +116,11 @@
109116
</exclusion>
110117
</exclusions>
111118
</dependency>
112-
<dependency>
113-
<groupId>com.oceanbase</groupId>
114-
<artifactId>obkv-table-client</artifactId>
115-
<version>${table.client.version}</version>
116-
</dependency>
119+
<!-- <dependency>-->
120+
<!-- <groupId>com.oceanbase</groupId>-->
121+
<!-- <artifactId>obkv-table-client</artifactId>-->
122+
<!-- <version>${table.client.version}</version>-->
123+
<!-- </dependency>-->
117124
<dependency>
118125
<groupId>org.apache.hbase</groupId>
119126
<artifactId>hbase-client</artifactId>

src/main/java/com/alipay/oceanbase/hbase/OHTable.java

Lines changed: 90 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.syncquery.ObTableQueryAsyncRequest;
3737
import com.alipay.oceanbase.rpc.stream.ObTableClientQueryAsyncStreamResult;
3838
import com.alipay.oceanbase.rpc.stream.ObTableClientQueryStreamResult;
39+
import com.alipay.oceanbase.rpc.table.ObHBaseParams;
40+
import com.alipay.oceanbase.rpc.table.ObKVParams;
3941
import com.alipay.sofa.common.thread.SofaThreadPoolExecutor;
4042

4143
import com.google.protobuf.Descriptors;
@@ -99,6 +101,11 @@ public class OHTable implements HTableInterface {
99101
*/
100102
private int operationTimeout;
101103

104+
/**
105+
* timeout for each rpc request
106+
*/
107+
private int rpcTimeout;
108+
102109
/**
103110
* if the <code>Get</code> executing pool is specified by user cleanupPoolOnClose will be false ,
104111
* which means that user is responsible for the pool
@@ -165,6 +172,8 @@ public class OHTable implements HTableInterface {
165172
*/
166173
private final Configuration configuration;
167174

175+
private int scannerTimeout;
176+
168177
/**
169178
* Creates an object to access a HBase table.
170179
* Shares oceanbase table obTableClient and other resources with other OHTable instances
@@ -290,7 +299,7 @@ public OHTable(TableName tableName, Connection connection,
290299
} else {
291300
this.cleanupPoolOnClose = false;
292301
}
293-
302+
this.rpcTimeout = connectionConfig.getRpcTimeout();
294303
this.operationTimeout = connectionConfig.getOperationTimeout();
295304
this.operationExecuteInPool = this.configuration.getBoolean(
296305
HBASE_CLIENT_OPERATION_EXECUTE_IN_POOL,
@@ -338,6 +347,12 @@ private void finishSetUp() {
338347
checkArgument(configuration != null, "configuration is null.");
339348
checkArgument(tableName != null, "tableNameString is null.");
340349
checkArgument(tableNameString != null, "tableNameString is null.");
350+
this.scannerTimeout = HBaseConfiguration.getInt(configuration,
351+
HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
352+
HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
353+
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
354+
this.rpcTimeout = configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
355+
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
341356
this.operationTimeout = this.configuration.getInt(
342357
HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
343358
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
@@ -390,13 +405,29 @@ public boolean exists(Get get) throws IOException {
390405
}
391406

392407
@Override
393-
public boolean[] existsAll(List<Get> list) throws IOException {
394-
throw new FeatureNotSupportedException("not supported yet.");
408+
public boolean[] existsAll(List<Get> gets) throws IOException {
409+
if (gets.isEmpty()) {
410+
return new boolean[] {};
411+
}
412+
if (gets.size() == 1) {
413+
return new boolean[] { exists(gets.get(0)) };
414+
}
415+
Result[] r = get(gets);
416+
boolean[] ret = new boolean[r.length];
417+
for (int i = 0; i < gets.size(); ++i) {
418+
ret[i] = exists(gets.get(i));
419+
}
420+
return ret;
395421
}
396422

397423
@Override
398424
public Boolean[] exists(List<Get> gets) throws IOException {
399-
throw new FeatureNotSupportedException("not supported yet'");
425+
boolean[] results = existsAll(gets);
426+
Boolean[] objectResults = new Boolean[results.length];
427+
for (int i = 0; i < results.length; ++i) {
428+
objectResults[i] = results[i];
429+
}
430+
return objectResults;
400431
}
401432

402433
@Override
@@ -478,8 +509,18 @@ public Result call() throws IOException {
478509
|| get.getFamilyMap().keySet().size() == 0) {
479510
filter = buildObHTableFilter(get.getFilter(), get.getTimeRange(),
480511
get.getMaxVersions(), null);
481-
obTableQuery = buildObTableQuery(filter, get.getRow(), true, get.getRow(),
482-
true);
512+
if (get.isClosestRowBefore()) {
513+
Scan scan = new Scan();
514+
scan.setStartRow(get.getRow());
515+
scan.setCaching(1);
516+
scan.setReversed(true);
517+
obTableQuery = buildObTableQuery(filter, scan);
518+
obTableQuery.setObKVParams(buildOBKVParams(scan));
519+
} else {
520+
obTableQuery = buildObTableQuery(filter, get.getRow(), true,
521+
get.getRow(), true);
522+
obTableQuery.setObKVParams(buildOBKVParams(get));
523+
}
483524
request = buildObTableQueryRequest(obTableQuery,
484525
getTargetTableName(tableNameString));
485526

@@ -492,10 +533,17 @@ public Result call() throws IOException {
492533
family = entry.getKey();
493534
filter = buildObHTableFilter(get.getFilter(), get.getTimeRange(),
494535
get.getMaxVersions(), entry.getValue());
495-
496-
obTableQuery = buildObTableQuery(filter, get.getRow(), true,
497-
get.getRow(), true);
498-
536+
if (get.isClosestRowBefore()) {
537+
Scan scan = new Scan(get.getRow());
538+
scan.setCaching(1);
539+
scan.setReversed(true);
540+
obTableQuery = buildObTableQuery(filter, scan);
541+
obTableQuery.setObKVParams(buildOBKVParams(scan));
542+
} else {
543+
obTableQuery = buildObTableQuery(filter, get.getRow(), true,
544+
get.getRow(), true);
545+
obTableQuery.setObKVParams(buildOBKVParams(get));
546+
}
499547
request = buildObTableQueryRequest(obTableQuery,
500548
getTargetTableName(tableNameString, Bytes.toString(family)));
501549
clientQueryStreamResult = (ObTableClientQueryStreamResult) obTableClient
@@ -661,6 +709,28 @@ private void validatePut(Put put) {
661709
}
662710
}
663711

712+
private ObKVParams buildOBKVParams(final Scan scan) {
713+
ObKVParams obKVParams = new ObKVParams();
714+
ObHBaseParams obHBaseParams = new ObHBaseParams();
715+
if (scan != null) {
716+
obHBaseParams.setCaching(scan.getCaching());
717+
obHBaseParams.setCallTimeout(scannerTimeout);
718+
obHBaseParams.setCacheBlock(scan.isGetScan());
719+
obHBaseParams.setAllowPartialResults(scan.getAllowPartialResults());
720+
}
721+
obKVParams.setObParamsBase(obHBaseParams);
722+
return obKVParams;
723+
}
724+
725+
private ObKVParams buildOBKVParams(final Get get) {
726+
ObKVParams obKVParams = new ObKVParams();
727+
ObHBaseParams obHBaseParams = new ObHBaseParams();
728+
obHBaseParams.setCheckExistenceOnly(get.isCheckExistenceOnly());
729+
obHBaseParams.setCacheBlock(get.getCacheBlocks());
730+
obKVParams.setObParamsBase(obHBaseParams);
731+
return obKVParams;
732+
}
733+
664734
/**
665735
* 例如当 key="key001", family = "family", c1="a" 时,才执行 put 操作,该命令是原子的
666736
* @param row row
@@ -1202,22 +1272,22 @@ public void setOperationTimeout(int operationTimeout) {
12021272
(this.operationTimeout != HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
12031273
}
12041274

1205-
// todo
12061275
@Override
12071276
public int getOperationTimeout() {
1208-
throw new FeatureNotSupportedException("not supported yet.");
1277+
return operationTimeout;
12091278
}
12101279

12111280
//todo
12121281
@Override
1213-
public void setRpcTimeout(int i) {
1214-
throw new FeatureNotSupportedException("not supported yet.");
1282+
public void setRpcTimeout(int rpcTimeout) {
1283+
this.rpcTimeout = rpcTimeout;
1284+
obTableClient.setRpcExecuteTimeout(rpcTimeout);
12151285
}
12161286

12171287
// todo
12181288
@Override
12191289
public int getRpcTimeout() {
1220-
throw new FeatureNotSupportedException("not supported yet.");
1290+
return this.rpcTimeout;
12211291
}
12221292

12231293
public void setRuntimeBatchExecutor(ExecutorService runtimeBatchExecutor) {
@@ -1376,7 +1446,6 @@ private ObTableQuery buildObTableQuery(ObHTableFilter filter, byte[] start,
13761446
obTableQuery.addSelectColumn(column);
13771447
}
13781448
obTableQuery.addKeyRange(obNewRange);
1379-
13801449
return obTableQuery;
13811450
}
13821451

@@ -1391,16 +1460,18 @@ private ObTableQuery buildObTableQuery(ObHTableFilter filter, final Scan scan) {
13911460
if (scan.isReversed()) {
13921461
obTableQuery = buildObTableQuery(filter, scan.getStopRow(), false, scan.getStartRow(),
13931462
true);
1463+
obTableQuery.setScanOrder(ObScanOrder.Reverse);
13941464
} else {
13951465
obTableQuery = buildObTableQuery(filter, scan.getStartRow(), true, scan.getStopRow(),
13961466
false);
13971467
}
1398-
if (scan.isReversed()) { // reverse scan 时设置为逆序
1399-
obTableQuery.setScanOrder(ObScanOrder.Reverse);
1400-
}
14011468
if (scan.getBatch() > 0) {
14021469
obTableQuery.setBatchSize(scan.getBatch());
14031470
}
1471+
obTableQuery.setMaxResultSize(scan.getMaxResultSize() > 0 ? scan.getMaxResultSize()
1472+
: configuration.getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
1473+
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE));
1474+
obTableQuery.setObKVParams(buildOBKVParams(scan));
14041475
return obTableQuery;
14051476
}
14061477

src/main/java/com/alipay/oceanbase/hbase/filter/HBaseFilterUtils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,10 +121,10 @@ private static String toParseableString(PageFilter filter) {
121121
private static String toParseableString(ColumnPaginationFilter filter) {
122122
if (filter.getColumnOffset() != null) {
123123
return filter.getClass().getSimpleName() + '(' + filter.getLimit() + ",'"
124-
+ Bytes.toString(filter.getColumnOffset()) + "')";
124+
+ Bytes.toString(filter.getColumnOffset()) + "')";
125125
} else {
126126
return filter.getClass().getSimpleName() + '(' + filter.getLimit() + ','
127-
+ filter.getOffset() + ')';
127+
+ filter.getOffset() + ')';
128128
}
129129
}
130130

src/main/java/com/alipay/oceanbase/hbase/result/ClientStreamScanner.java

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,7 @@
3131
import org.apache.hadoop.hbase.util.Bytes;
3232
import org.slf4j.Logger;
3333
import java.io.IOException;
34-
import java.util.ArrayList;
35-
import java.util.Arrays;
36-
import java.util.List;
34+
import java.util.*;
3735

3836
import static com.alipay.oceanbase.hbase.util.TableHBaseLoggerFactory.LCD;
3937

@@ -75,16 +73,8 @@ public ClientStreamScanner(ObTableClientQueryAsyncStreamResult streamResult, Str
7573
public Result next() throws IOException {
7674
try {
7775
checkStatus();
78-
79-
if (!streamNext) {
80-
return null;
81-
}
82-
8376
List<ObObj> startRow;
84-
85-
if (streamResult.getRowIndex() != -1) {
86-
startRow = streamResult.getRow();
87-
} else if (streamResult.next()) {
77+
if (streamResult.next()) {
8878
startRow = streamResult.getRow();
8979
} else {
9080
return null;
@@ -108,8 +98,7 @@ public Result next() throws IOException {
10898
KeyValue startKeyValue = new KeyValue(sk, family, sq, st, sv);
10999
List<KeyValue> keyValues = new ArrayList<KeyValue>();
110100
keyValues.add(startKeyValue);
111-
112-
while (streamNext = streamResult.next()) {
101+
while (!streamResult.getCacheRows().isEmpty() && streamResult.next()) {
113102
List<ObObj> row = streamResult.getRow();
114103
if (this.isTableGroup) {
115104
// split family and qualifier
@@ -127,6 +116,7 @@ public Result next() throws IOException {
127116
// when rowKey is equal to the previous rowKey ,merge the result into the same result
128117
keyValues.add(new KeyValue(k, family, q, t, v));
129118
} else {
119+
streamResult.getCacheRows().addFirst(row);
130120
break;
131121
}
132122
}

0 commit comments

Comments
 (0)