26
26
import com .alipay .oceanbase .hbase .util .ObTableClientManager ;
27
27
import com .alipay .oceanbase .hbase .util .TableHBaseLoggerFactory ;
28
28
import com .alipay .oceanbase .rpc .ObTableClient ;
29
+ import com .alipay .oceanbase .rpc .filter .ObHBaseParams ;
30
+ import com .alipay .oceanbase .rpc .filter .ObParamsBase ;
31
+ import com .alipay .oceanbase .rpc .filter .ObParams ;
29
32
import com .alipay .oceanbase .rpc .mutation .BatchOperation ;
30
33
import com .alipay .oceanbase .rpc .mutation .result .BatchOperationResult ;
31
34
import com .alipay .oceanbase .rpc .property .Property ;
@@ -181,6 +184,8 @@ public class OHTable implements HTableInterface {
181
184
*/
182
185
private final Configuration configuration ;
183
186
187
+ private int scannerTimeout ;
188
+
184
189
/**
185
190
* Creates an object to access a HBase table.
186
191
* Shares oceanbase table obTableClient and other resources with other OHTable instances
@@ -203,6 +208,9 @@ public OHTable(Configuration configuration, String tableName) throws IOException
203
208
DEFAULT_HBASE_HTABLE_PRIVATE_THREADS_MAX );
204
209
this .keepAliveTime = configuration .getLong (HBASE_HTABLE_THREAD_KEEP_ALIVE_TIME ,
205
210
DEFAULT_HBASE_HTABLE_THREAD_KEEP_ALIVE_TIME );
211
+ HBaseConfiguration .getInt (configuration , HConstants .HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD ,
212
+ HConstants .HBASE_REGIONSERVER_LEASE_PERIOD_KEY ,
213
+ HConstants .DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD );
206
214
this .executePool = createDefaultThreadPoolExecutor (1 , maxThreads , keepAliveTime );
207
215
this .obTableClient = ObTableClientManager .getOrCreateObTableClient (configuration );
208
216
@@ -364,8 +372,9 @@ public HTableDescriptor getTableDescriptor() {
364
372
* @throws IOException e
365
373
*/
366
374
public boolean exists (Get get ) throws IOException {
375
+ get .setCheckExistenceOnly (true );
367
376
Result r = get (get );
368
- return ! r . isEmpty ();
377
+ return r . getExists ();
369
378
}
370
379
371
380
@ Override
@@ -380,8 +389,8 @@ public boolean[] existsAll(List<Get> list) throws IOException {
380
389
// todo: Optimize after CheckExistenceOnly is finished
381
390
Result [] r = get (list );
382
391
boolean [] ret = new boolean [r .length ];
383
- for (int i = 0 ; i < r . length ; ++i ){
384
- ret [i ] = ! r [ i ]. isEmpty ( );
392
+ for (int i = 0 ; i < list . size () ; ++i ) {
393
+ ret [i ] = exists ( list . get ( i ) );
385
394
}
386
395
return ret ;
387
396
}
@@ -470,10 +479,20 @@ public Result call() throws IOException {
470
479
get .getMaxVersions (), null );
471
480
obTableQuery = buildObTableQuery (filter , get .getRow (), true , get .getRow (),
472
481
true , -1 );
482
+ obTableQuery .setObParams (buildObHBaseParams (null , get ));
473
483
request = buildObTableQueryRequest (obTableQuery , getTargetTableName (tableNameString ));
474
484
475
485
clientQueryStreamResult = (ObTableClientQueryStreamResult ) obTableClient
476
486
.execute (request );
487
+ if (get .isCheckExistenceOnly () ) {
488
+ Result result = new Result ();
489
+ if (clientQueryStreamResult .getCacheRows ().size () != 0 ) {
490
+ result .setExists (true );
491
+ } else {
492
+ result .setExists (false );
493
+ }
494
+ return result ;
495
+ }
477
496
getKeyValueFromResult (clientQueryStreamResult , keyValueList , true , family );
478
497
} else {
479
498
for (Map .Entry <byte [], NavigableSet <byte []>> entry : get .getFamilyMap ()
@@ -492,10 +511,20 @@ public Result call() throws IOException {
492
511
get .getRow (), true , -1 );
493
512
}
494
513
514
+ obTableQuery .setObParams (buildObHBaseParams (null , get ));
495
515
request = buildObTableQueryRequest (obTableQuery ,
496
516
getTargetTableName (tableNameString , Bytes .toString (family )));
497
517
clientQueryStreamResult = (ObTableClientQueryStreamResult ) obTableClient
498
518
.execute (request );
519
+ if (get .isCheckExistenceOnly () ) {
520
+ Result result = new Result ();
521
+ if (clientQueryStreamResult .getCacheRows ().size () != 0 ) {
522
+ result .setExists (true );
523
+ } else {
524
+ result .setExists (false );
525
+ }
526
+ return result ;
527
+ }
499
528
getKeyValueFromResult (clientQueryStreamResult , keyValueList , false ,
500
529
family );
501
530
}
@@ -555,17 +584,18 @@ public ResultScanner call() throws IOException {
555
584
scan .getMaxVersions (), null );
556
585
if (scan .isReversed ()) {
557
586
obTableQuery = buildObTableQuery (filter , scan .getStopRow (), false ,
558
- scan .getStartRow (), true , scan .getBatch ());
587
+ scan .getStartRow (), true , scan .getCaching ());
559
588
} else {
560
589
obTableQuery = buildObTableQuery (filter , scan .getStartRow (), true ,
561
- scan .getStopRow (), false , scan .getBatch ());
590
+ scan .getStopRow (), false , scan .getCaching ());
562
591
}
563
592
if (scan .isReversed ()) { // reverse scan 时设置为逆序
564
593
obTableQuery .setScanOrder (ObScanOrder .Reverse );
565
594
}
566
595
obTableQuery .setMaxResultSize (scan .getMaxResultSize () > 0 ? scan .getMaxResultSize () : conf .getLong (
567
596
HConstants .HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY ,
568
597
HConstants .DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE ));
598
+ obTableQuery .setObParams (buildObHBaseParams (scan , null ));
569
599
request = buildObTableQueryAsyncRequest (obTableQuery , getTargetTableName (tableNameString ));
570
600
clientQueryAsyncStreamResult = (ObTableClientQueryAsyncStreamResult ) obTableClient
571
601
.execute (request );
@@ -579,20 +609,19 @@ public ResultScanner call() throws IOException {
579
609
scan .getMaxVersions (), entry .getValue ());
580
610
if (scan .isReversed ()) {
581
611
obTableQuery = buildObTableQuery (filter , scan .getStopRow (), false ,
582
- scan .getStartRow (), true , scan .getBatch ());
612
+ scan .getStartRow (), true , scan .getCaching ());
583
613
} else {
584
614
obTableQuery = buildObTableQuery (filter , scan .getStartRow (), true ,
585
- scan .getStopRow (), false , scan .getBatch ());
615
+ scan .getStopRow (), false , scan .getCaching ());
586
616
}
587
617
if (scan .isReversed ()) { // reverse scan 时设置为逆序
588
618
obTableQuery .setScanOrder (ObScanOrder .Reverse );
589
619
}
590
620
591
- // no support set maxResultSize.
592
621
obTableQuery .setMaxResultSize (scan .getMaxResultSize () > 0 ? scan .getMaxResultSize () : conf .getLong (
593
622
HConstants .HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY ,
594
623
HConstants .DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE ));
595
-
624
+ obTableQuery . setObParams ( buildObHBaseParams ( scan , null ));
596
625
request = buildObTableQueryAsyncRequest (obTableQuery ,
597
626
getTargetTableName (tableNameString , Bytes .toString (family )));
598
627
clientQueryAsyncStreamResult = (ObTableClientQueryAsyncStreamResult ) obTableClient
@@ -614,6 +643,24 @@ public ResultScanner call() throws IOException {
614
643
return executeServerCallable (serverCallable );
615
644
}
616
645
646
+ public ObParams buildObHBaseParams (Scan scan , Get get ) {
647
+ ObParams obParams = new ObParams ();
648
+ ObHBaseParams obHBaseParams = new ObHBaseParams ();
649
+ if (scan != null ) {
650
+ obHBaseParams .setBatch (scan .getBatch ());
651
+ obHBaseParams .setCallTimeout (scannerTimeout );
652
+ obHBaseParams .setRaw (scan .isRaw ());
653
+ obHBaseParams .setCacheBlock (scan .isGetScan ());
654
+ obHBaseParams .setAllowPartialResults (scan .getAllowPartialResults ());
655
+ }
656
+ if (get != null ) {
657
+ obHBaseParams .setCheckExistenceOnly (get .isCheckExistenceOnly ());
658
+ obHBaseParams .setCacheBlock (get .getCacheBlocks ());
659
+ }
660
+ obParams .setObParamsBase (obHBaseParams );
661
+ return obParams ;
662
+ }
663
+
617
664
public ResultScanner getScanner (final byte [] family ) throws IOException {
618
665
Scan scan = new Scan ();
619
666
scan .addFamily (family );
@@ -1510,10 +1557,10 @@ public void refreshTableEntry(String familyString, boolean hasTestLoad) throws E
1510
1557
return ;
1511
1558
}
1512
1559
this .obTableClient .getOrRefreshTableEntry (
1513
- getNormalTargetTableName (tableNameString , familyString ), true , true );
1560
+ getNormalTargetTableName (tableNameString , familyString ), true , true , false );
1514
1561
if (hasTestLoad ) {
1515
1562
this .obTableClient .getOrRefreshTableEntry (
1516
- getTestLoadTargetTableName (tableNameString , familyString ), true , true );
1563
+ getTestLoadTargetTableName (tableNameString , familyString ), true , true , false );
1517
1564
}
1518
1565
}
1519
1566
0 commit comments