54
54
import org .apache .hadoop .hbase .client .coprocessor .Batch ;
55
55
import org .apache .hadoop .hbase .filter .CompareFilter ;
56
56
import org .apache .hadoop .hbase .filter .Filter ;
57
+ import org .apache .hadoop .hbase .filter .FilterList ;
58
+ import org .apache .hadoop .hbase .filter .PageFilter ;
57
59
import org .apache .hadoop .hbase .io .TimeRange ;
58
60
import org .apache .hadoop .hbase .ipc .CoprocessorRpcChannel ;
59
61
import org .apache .hadoop .hbase .util .Bytes ;
@@ -503,24 +505,10 @@ public Result call() throws IOException {
503
505
ObTableClientQueryStreamResult clientQueryStreamResult ;
504
506
ObTableQueryRequest request ;
505
507
ObTableQuery obTableQuery ;
506
- ObHTableFilter filter ;
507
508
try {
508
509
if (get .getFamilyMap ().keySet () == null
509
510
|| get .getFamilyMap ().keySet ().size () == 0 ) {
510
- filter = buildObHTableFilter (get .getFilter (), get .getTimeRange (),
511
- get .getMaxVersions (), null );
512
- if (get .isClosestRowBefore ()) {
513
- Scan scan = new Scan ();
514
- scan .setStartRow (get .getRow ());
515
- scan .setCaching (1 );
516
- scan .setReversed (true );
517
- obTableQuery = buildObTableQuery (filter , scan );
518
- obTableQuery .setObKVParams (buildOBKVParams (scan ));
519
- } else {
520
- obTableQuery = buildObTableQuery (filter , get .getRow (), true ,
521
- get .getRow (), true );
522
- obTableQuery .setObKVParams (buildOBKVParams (get ));
523
- }
511
+ obTableQuery = buildObTableQuery (get , null );
524
512
request = buildObTableQueryRequest (obTableQuery ,
525
513
getTargetTableName (tableNameString ));
526
514
@@ -531,19 +519,7 @@ public Result call() throws IOException {
531
519
for (Map .Entry <byte [], NavigableSet <byte []>> entry : get .getFamilyMap ()
532
520
.entrySet ()) {
533
521
family = entry .getKey ();
534
- filter = buildObHTableFilter (get .getFilter (), get .getTimeRange (),
535
- get .getMaxVersions (), entry .getValue ());
536
- if (get .isClosestRowBefore ()) {
537
- Scan scan = new Scan (get .getRow ());
538
- scan .setCaching (1 );
539
- scan .setReversed (true );
540
- obTableQuery = buildObTableQuery (filter , scan );
541
- obTableQuery .setObKVParams (buildOBKVParams (scan ));
542
- } else {
543
- obTableQuery = buildObTableQuery (filter , get .getRow (), true ,
544
- get .getRow (), true );
545
- obTableQuery .setObKVParams (buildOBKVParams (get ));
546
- }
522
+ obTableQuery = buildObTableQuery (get , entry .getValue ());
547
523
request = buildObTableQueryRequest (obTableQuery ,
548
524
getTargetTableName (tableNameString , Bytes .toString (family )));
549
525
clientQueryStreamResult = (ObTableClientQueryStreamResult ) obTableClient
@@ -870,7 +846,7 @@ private boolean checkAndMutation(byte[] row, byte[] family, byte[] qualifier, Co
870
846
keyValueList .addAll (entry .getValue ());
871
847
}
872
848
}
873
- ObTableQuery obTableQuery = buildObTableQuery (filter , row , true , row , true );
849
+ ObTableQuery obTableQuery = buildObTableQuery (filter , row , true , row , true , false );
874
850
875
851
ObTableBatchOperation batch = buildObTableBatchOperation (keyValueList , false , null );
876
852
@@ -908,7 +884,7 @@ public Result append(Append append) throws IOException {
908
884
true , qualifiers );
909
885
// the later hbase has supported timeRange
910
886
ObHTableFilter filter = buildObHTableFilter (null , null , 1 , qualifiers );
911
- ObTableQuery obTableQuery = buildObTableQuery (filter , r , true , r , true );
887
+ ObTableQuery obTableQuery = buildObTableQuery (filter , r , true , r , true , false );
912
888
ObTableQueryAndMutate queryAndMutate = new ObTableQueryAndMutate ();
913
889
queryAndMutate .setTableQuery (obTableQuery );
914
890
queryAndMutate .setMutations (batchOperation );
@@ -963,11 +939,11 @@ public Result increment(Increment increment) throws IOException {
963
939
batch .addTableOperation (getInstance (INCREMENT , new Object [] { rowKey , qualifier ,
964
940
Long .MAX_VALUE }, V_COLUMNS , new Object [] { cell .getValue () }));
965
941
});
966
-
942
+
967
943
ObHTableFilter filter = buildObHTableFilter (null , increment .getTimeRange (), 1 ,
968
944
qualifiers );
969
945
970
- ObTableQuery obTableQuery = buildObTableQuery (filter , rowKey , true , rowKey , true );
946
+ ObTableQuery obTableQuery = buildObTableQuery (filter , rowKey , true , rowKey , true , false );
971
947
ObTableQueryAndMutate queryAndMutate = new ObTableQueryAndMutate ();
972
948
queryAndMutate .setMutations (batch );
973
949
queryAndMutate .setTableQuery (obTableQuery );
@@ -1017,7 +993,7 @@ public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, lo
1017
993
1018
994
ObHTableFilter filter = buildObHTableFilter (null , null , 1 , qualifiers );
1019
995
1020
- ObTableQuery obTableQuery = buildObTableQuery (filter , row , true , row , true );
996
+ ObTableQuery obTableQuery = buildObTableQuery (filter , row , true , row , true , false );
1021
997
ObTableQueryAndMutate queryAndMutate = new ObTableQueryAndMutate ();
1022
998
queryAndMutate .setMutations (batch );
1023
999
queryAndMutate .setTableQuery (obTableQuery );
@@ -1419,7 +1395,8 @@ private ObHTableFilter buildObHTableFilter(String filterString, TimeRange timeRa
1419
1395
}
1420
1396
1421
1397
private ObTableQuery buildObTableQuery (ObHTableFilter filter , byte [] start ,
1422
- boolean includeStart , byte [] stop , boolean includeStop ) {
1398
+ boolean includeStart , byte [] stop , boolean includeStop ,
1399
+ boolean isReversed ) {
1423
1400
ObNewRange obNewRange = new ObNewRange ();
1424
1401
1425
1402
if (Arrays .equals (start , HConstants .EMPTY_BYTE_ARRAY )) {
@@ -1440,6 +1417,9 @@ private ObTableQuery buildObTableQuery(ObHTableFilter filter, byte[] start,
1440
1417
obNewRange .setEndKey (ObRowKey .getInstance (stop , ObObj .getMin (), ObObj .getMin ()));
1441
1418
}
1442
1419
ObTableQuery obTableQuery = new ObTableQuery ();
1420
+ if (isReversed ) {
1421
+ obTableQuery .setScanOrder (ObScanOrder .Reverse );
1422
+ }
1443
1423
obTableQuery .setIndexName ("PRIMARY" );
1444
1424
obTableQuery .sethTableFilter (filter );
1445
1425
for (String column : ALL_COLUMNS ) {
@@ -1459,11 +1439,10 @@ private ObTableQuery buildObTableQuery(ObHTableFilter filter, final Scan scan) {
1459
1439
}
1460
1440
if (scan .isReversed ()) {
1461
1441
obTableQuery = buildObTableQuery (filter , scan .getStopRow (), false , scan .getStartRow (),
1462
- true );
1463
- obTableQuery .setScanOrder (ObScanOrder .Reverse );
1442
+ true , true );
1464
1443
} else {
1465
1444
obTableQuery = buildObTableQuery (filter , scan .getStartRow (), true , scan .getStopRow (),
1466
- false );
1445
+ false , false );
1467
1446
}
1468
1447
if (scan .getBatch () > 0 ) {
1469
1448
obTableQuery .setBatchSize (scan .getBatch ());
@@ -1475,6 +1454,29 @@ private ObTableQuery buildObTableQuery(ObHTableFilter filter, final Scan scan) {
1475
1454
return obTableQuery ;
1476
1455
}
1477
1456
1457
+ private ObTableQuery buildObTableQuery (final Get get , Collection <byte []> columnQualifiers ) {
1458
+ ObTableQuery obTableQuery ;
1459
+ if (get .isClosestRowBefore ()) {
1460
+ PageFilter pageFilter = new PageFilter (1 );
1461
+ FilterList filterList = new FilterList (FilterList .Operator .MUST_PASS_ALL );
1462
+ filterList .addFilter (pageFilter );
1463
+ if (null != get .getFilter ()) {
1464
+ filterList .addFilter (get .getFilter ());
1465
+ }
1466
+ get .setFilter (filterList );
1467
+ }
1468
+ ObHTableFilter filter = buildObHTableFilter (get .getFilter (), get .getTimeRange (),
1469
+ get .getMaxVersions (), columnQualifiers );
1470
+ if (get .isClosestRowBefore ()) {
1471
+ obTableQuery = buildObTableQuery (filter , HConstants .EMPTY_BYTE_ARRAY , true ,
1472
+ get .getRow (), true , true );
1473
+ } else {
1474
+ obTableQuery = buildObTableQuery (filter , get .getRow (), true , get .getRow (), true , false );
1475
+ }
1476
+ obTableQuery .setObKVParams (buildOBKVParams (get ));
1477
+ return obTableQuery ;
1478
+ }
1479
+
1478
1480
private ObTableBatchOperation buildObTableBatchOperation (List <KeyValue > keyValueList ,
1479
1481
boolean putToAppend ,
1480
1482
List <byte []> qualifiers ) {
0 commit comments