28
28
import com .alipay .oceanbase .rpc .ObTableClient ;
29
29
import com .alipay .oceanbase .rpc .mutation .BatchOperation ;
30
30
import com .alipay .oceanbase .rpc .mutation .result .BatchOperationResult ;
31
+ import com .alipay .oceanbase .rpc .property .Property ;
31
32
import com .alipay .oceanbase .rpc .protocol .payload .impl .ObObj ;
32
33
import com .alipay .oceanbase .rpc .protocol .payload .impl .ObRowKey ;
33
34
import com .alipay .oceanbase .rpc .protocol .payload .impl .execute .*;
39
40
import com .alipay .oceanbase .rpc .stream .ObTableClientQueryAsyncStreamResult ;
40
41
import com .alipay .oceanbase .rpc .stream .ObTableClientQueryStreamResult ;
41
42
import com .alipay .sofa .common .thread .SofaThreadPoolExecutor ;
42
- import com .alipay .oceanbase .hbase .exception .OperationTimeoutException ;
43
43
44
44
import com .google .protobuf .Descriptors ;
45
45
import com .google .protobuf .Message ;
@@ -368,8 +368,31 @@ public boolean exists(Get get) throws IOException {
368
368
return !r .isEmpty ();
369
369
}
370
370
371
+ @ Override
372
+ public boolean [] existsAll (List <Get > list ) throws IOException {
373
+ if (list .isEmpty ()) {
374
+ return new boolean []{};
375
+ }
376
+ if (list .size () == 1 ) {
377
+ return new boolean []{exists (list .get (0 ))};
378
+ }
379
+
380
+ // todo: Optimize after CheckExistenceOnly is finished
381
+ Result [] r = get (list );
382
+ boolean [] ret = new boolean [r .length ];
383
+ for (int i = 0 ; i < r .length ; ++i ){
384
+ ret [i ] = !r [i ].isEmpty ();
385
+ }
386
+ return ret ;
387
+ }
388
+
371
389
public Boolean [] exists (List <Get > gets ) throws IOException {
372
- throw new FeatureNotSupportedException ("not supported yet'" );
390
+ Boolean [] result = new Boolean [gets .size ()];
391
+ boolean [] exists = existsAll (gets );
392
+ for (int i = 0 ; i < gets .size (); ++i ) {
393
+ result [i ] = exists [i ];
394
+ }
395
+ return result ;
373
396
}
374
397
375
398
public void batch (List <? extends Row > actions , Object [] results ) {
@@ -460,9 +483,14 @@ public Result call() throws IOException {
460
483
461
484
filter = buildObHTableFilter (get .getFilter (), get .getTimeRange (),
462
485
get .getMaxVersions (), entry .getValue ());
463
-
464
- obTableQuery = buildObTableQuery (filter , get .getRow (), true ,
465
- get .getRow (), true , -1 );
486
+ if (get .isClosestRowBefore ()) {
487
+ obTableQuery = buildObTableQuery (filter , null , false ,
488
+ get .getRow (), true , 1 );
489
+ obTableQuery .setScanOrder (ObScanOrder .Reverse );
490
+ } else {
491
+ obTableQuery = buildObTableQuery (filter , get .getRow (), true ,
492
+ get .getRow (), true , -1 );
493
+ }
466
494
467
495
request = buildObTableQueryRequest (obTableQuery ,
468
496
getTargetTableName (tableNameString , Bytes .toString (family )));
@@ -535,6 +563,9 @@ public ResultScanner call() throws IOException {
535
563
if (scan .isReversed ()) { // reverse scan 时设置为逆序
536
564
obTableQuery .setScanOrder (ObScanOrder .Reverse );
537
565
}
566
+ obTableQuery .setMaxResultSize (scan .getMaxResultSize () > 0 ? scan .getMaxResultSize () : conf .getLong (
567
+ HConstants .HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY ,
568
+ HConstants .DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE ));
538
569
request = buildObTableQueryAsyncRequest (obTableQuery , getTargetTableName (tableNameString ));
539
570
clientQueryAsyncStreamResult = (ObTableClientQueryAsyncStreamResult ) obTableClient
540
571
.execute (request );
@@ -558,7 +589,9 @@ public ResultScanner call() throws IOException {
558
589
}
559
590
560
591
// no support set maxResultSize.
561
- // obTableQuery.setMaxResultSize(scan.getMaxResultSize());
592
+ obTableQuery .setMaxResultSize (scan .getMaxResultSize () > 0 ? scan .getMaxResultSize () : conf .getLong (
593
+ HConstants .HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY ,
594
+ HConstants .DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE ));
562
595
563
596
request = buildObTableQueryAsyncRequest (obTableQuery ,
564
597
getTargetTableName (tableNameString , Bytes .toString (family )));
@@ -655,7 +688,12 @@ private void validatePut(Put put) {
655
688
*/
656
689
public boolean checkAndPut (byte [] row , byte [] family , byte [] qualifier , byte [] value , Put put )
657
690
throws IOException {
658
- return checkAndMutation (row , family , qualifier , value , put );
691
+ return checkAndPut (row , family , qualifier , CompareFilter .CompareOp .EQUAL , value , put );
692
+ }
693
+
694
+ @ Override
695
+ public boolean checkAndPut (byte [] row , byte [] family , byte [] qualifier , CompareFilter .CompareOp compareOp , byte [] value , Put put ) throws IOException {
696
+ return checkAndMutation (row , family , qualifier , compareOp , value , put );
659
697
}
660
698
661
699
private void innerDelete (Delete delete ) throws IOException {
@@ -708,10 +746,15 @@ public void delete(List<Delete> deletes) throws IOException {
708
746
*/
709
747
public boolean checkAndDelete (byte [] row , byte [] family , byte [] qualifier , byte [] value ,
710
748
Delete delete ) throws IOException {
711
- return checkAndMutation (row , family , qualifier , value , delete );
749
+ return checkAndDelete (row , family , qualifier , CompareFilter . CompareOp . EQUAL , value , delete );
712
750
}
713
751
714
- private boolean checkAndMutation (byte [] row , byte [] family , byte [] qualifier , byte [] value ,
752
+ @ Override
753
+ public boolean checkAndDelete (byte [] row , byte [] family , byte [] qualifier , CompareFilter .CompareOp compareOp , byte [] value , Delete delete ) throws IOException {
754
+ return checkAndMutation (row , family , qualifier , compareOp , value , delete );
755
+ }
756
+
757
+ private boolean checkAndMutation (byte [] row , byte [] family , byte [] qualifier , CompareFilter .CompareOp compareOp , byte [] value ,
715
758
Mutation mutation ) throws IOException {
716
759
try {
717
760
checkArgument (row != null , "row is null" );
@@ -721,7 +764,7 @@ private boolean checkAndMutation(byte[] row, byte[] family, byte[] qualifier, by
721
764
722
765
checkArgument (!mutation .isEmpty (), "mutation is empty" );
723
766
724
- String filterString = buildCheckAndMutateFilterString (family , qualifier , value );
767
+ String filterString = buildCheckAndMutateFilterString (family , qualifier , compareOp , value );
725
768
726
769
ObHTableFilter filter = buildObHTableFilter (filterString , null , 1 , qualifier );
727
770
@@ -1120,13 +1163,30 @@ public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
1120
1163
throw new FeatureNotSupportedException ("not supported yet'" );
1121
1164
}
1122
1165
1166
+ // operationTimeout means operation max wait time in client
1167
+ @ Override
1123
1168
public void setOperationTimeout (int operationTimeout ) {
1124
1169
this .operationTimeout = operationTimeout ;
1125
1170
this .operationExecuteInPool = this .configuration .getBoolean (
1126
1171
HBASE_CLIENT_OPERATION_EXECUTE_IN_POOL ,
1127
1172
(this .operationTimeout != HConstants .DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT ));
1128
1173
}
1129
1174
1175
+ @ Override
1176
+ public int getOperationTimeout () {
1177
+ return operationTimeout ;
1178
+ }
1179
+
1180
+ // rpcTimeout means server max execute time, equal Table API rpc_execute_time, it must be set before OHTable init; please pass this parameter through conf
1181
+ @ Override
1182
+ public void setRpcTimeout (int rpcTimeout ) {
1183
+ }
1184
+
1185
+ @ Override
1186
+ public int getRpcTimeout () {
1187
+ return Integer .parseInt (configuration .get (Property .RPC_EXECUTE_TIMEOUT .getKey ()));
1188
+ }
1189
+
1130
1190
public void setRuntimeBatchExecutor (ExecutorService runtimeBatchExecutor ) {
1131
1191
this .obTableClient .setRuntimeBatchExecutor (runtimeBatchExecutor );
1132
1192
}
@@ -1215,13 +1275,13 @@ private ObHTableFilter buildObHTableFilter(Filter filter, TimeRange timeRange, i
1215
1275
return obHTableFilter ;
1216
1276
}
1217
1277
1218
- private String buildCheckAndMutateFilterString (byte [] family , byte [] qualifier , byte [] value ) {
1278
+ private String buildCheckAndMutateFilterString (byte [] family , byte [] qualifier , CompareFilter . CompareOp compareOp , byte [] value ) {
1219
1279
if (value != null ) {
1220
- return ("CheckAndMutateFilter(= , 'binary:" + Bytes .toString (value ) + "', '"
1280
+ return ("CheckAndMutateFilter(" + HBaseFilterUtils . toParseableString ( compareOp ) + " , 'binary:" + Bytes .toString (value ) + "', '"
1221
1281
+ Bytes .toString (family ) + "', '"
1222
1282
+ (qualifier == null ? "" : Bytes .toString (qualifier )) + "', false)" );
1223
1283
} else {
1224
- return ("CheckAndMutateFilter(= , 'binary:', '" + Bytes .toString (family ) + "', '"
1284
+ return ("CheckAndMutateFilter(" + HBaseFilterUtils . toParseableString ( compareOp ) + " , 'binary:', '" + Bytes .toString (family ) + "', '"
1225
1285
+ (qualifier == null ? "" : Bytes .toString (qualifier )) + "', true)" );
1226
1286
}
1227
1287
}
0 commit comments