35
35
import com .alipay .oceanbase .rpc .protocol .payload .impl .execute .query .*;
36
36
import com .alipay .oceanbase .rpc .protocol .payload .impl .execute .syncquery .ObTableQueryAsyncRequest ;
37
37
import com .alipay .oceanbase .rpc .stream .ObTableClientQueryAsyncStreamResult ;
38
- import com .alipay .oceanbase .rpc .stream .ObTableClientQueryStreamResult ;
39
38
import com .alipay .oceanbase .rpc .table .ObHBaseParams ;
40
39
import com .alipay .oceanbase .rpc .table .ObKVParams ;
41
40
import com .alipay .sofa .common .thread .SofaThreadPoolExecutor ;
49
48
import org .apache .hadoop .hbase .*;
50
49
import org .apache .hadoop .hbase .client .*;
51
50
import org .apache .hadoop .hbase .client .coprocessor .Batch ;
52
- import org .apache .hadoop .hbase .filter .CompareFilter ;
53
- import org .apache .hadoop .hbase .filter .Filter ;
54
- import org .apache .hadoop .hbase .filter .FilterList ;
55
- import org .apache .hadoop .hbase .filter .PageFilter ;
51
+ import org .apache .hadoop .hbase .filter .*;
56
52
import org .apache .hadoop .hbase .io .TimeRange ;
57
53
import org .apache .hadoop .hbase .ipc .CoprocessorRpcChannel ;
58
54
import org .apache .hadoop .hbase .util .Bytes ;
@@ -401,8 +397,8 @@ public HTableDescriptor getTableDescriptor() {
401
397
*/
402
398
@ Override
403
399
public boolean exists (Get get ) throws IOException {
404
- Result r = get ( get );
405
- return ! r . isEmpty ();
400
+ get . setCheckExistenceOnly ( true );
401
+ return this . get ( get ). getExists ();
406
402
}
407
403
408
404
@ Override
@@ -455,27 +451,51 @@ public <R> Object[] batchCallback(List<? extends Row> actions, Batch.Callback<R>
455
451
throw new FeatureNotSupportedException ("not supported yet'" );
456
452
}
457
453
458
- private void getKeyValueFromResult (AbstractQueryStreamResult clientQueryStreamResult ,
459
- List <KeyValue > keyValueList , boolean isTableGroup ,
460
- byte [] family ) throws Exception {
454
+ public static int compareByteArray (byte [] bt1 , byte [] bt2 ) {
455
+ int minLength = Math .min (bt1 .length , bt2 .length );
456
+ for (int i = 0 ; i < minLength ; i ++) {
457
+ if (bt1 [i ] != bt2 [i ]) {
458
+ return bt1 [i ] - bt2 [i ];
459
+ }
460
+ }
461
+ return bt1 .length - bt2 .length ;
462
+ }
463
+
464
+ private void getMaxRowFromResult (AbstractQueryStreamResult clientQueryStreamResult ,
465
+ List <KeyValue > keyValueList , boolean isTableGroup ,
466
+ byte [] family ) throws Exception {
461
467
byte [][] familyAndQualifier = new byte [2 ][];
468
+ KeyValue kv = null ;
462
469
while (clientQueryStreamResult .next ()) {
463
470
List <ObObj > row = clientQueryStreamResult .getRow ();
464
- if (isTableGroup ) {
465
- // split family and qualifier
466
- familyAndQualifier = OHBaseFuncUtils . extractFamilyFromQualifier (( byte []) row . get ( 1 )
467
- . getValue ()) ;
471
+ if (row . isEmpty () ) {
472
+ // Currently, checkExistOnly is set, and if the row exists, it returns an empty row.
473
+ keyValueList . add ( new KeyValue ());
474
+ return ;
468
475
} else {
469
- familyAndQualifier [0 ] = family ;
470
- familyAndQualifier [1 ] = (byte []) row .get (1 ).getValue ();
476
+ if (kv == null
477
+ || compareByteArray (kv .getRow (), (byte []) row .get (0 ).getValue ()) <= 0 ) {
478
+ if (kv != null
479
+ && compareByteArray (kv .getRow (), (byte []) row .get (0 ).getValue ()) != 0 ) {
480
+ keyValueList .clear ();
481
+ }
482
+ if (isTableGroup ) {
483
+ // split family and qualifier
484
+ familyAndQualifier = OHBaseFuncUtils
485
+ .extractFamilyFromQualifier ((byte []) row .get (1 ).getValue ());
486
+ } else {
487
+ familyAndQualifier [0 ] = family ;
488
+ familyAndQualifier [1 ] = (byte []) row .get (1 ).getValue ();
489
+ }
490
+ kv = new KeyValue ((byte []) row .get (0 ).getValue (),//K
491
+ familyAndQualifier [0 ], // family
492
+ familyAndQualifier [1 ], // qualifiermat
493
+ (Long ) row .get (2 ).getValue (), // T
494
+ (byte []) row .get (3 ).getValue ()// V
495
+ );
496
+ keyValueList .add (kv );
497
+ }
471
498
}
472
- KeyValue kv = new KeyValue ((byte []) row .get (0 ).getValue (),//K
473
- familyAndQualifier [0 ], // family
474
- familyAndQualifier [1 ], // qualifier
475
- (Long ) row .get (2 ).getValue (), // T
476
- (byte []) row .get (3 ).getValue ()// V
477
- );
478
- keyValueList .add (kv );
479
499
}
480
500
}
481
501
@@ -491,7 +511,8 @@ private String getTargetTableName(String tableNameString) {
491
511
// To enable the server to identify the column family to which a qualifier belongs,
492
512
// the client writes the column family name into the qualifier.
493
513
// The server then parses this information to determine the table that needs to be operated on.
494
- private void processColumnFilters (NavigableSet <byte []> columnFilters , Map <byte [], NavigableSet <byte []>> familyMap ) {
514
+ private void processColumnFilters (NavigableSet <byte []> columnFilters ,
515
+ Map <byte [], NavigableSet <byte []>> familyMap ) {
495
516
for (Map .Entry <byte [], NavigableSet <byte []>> entry : familyMap .entrySet ()) {
496
517
if (entry .getValue () != null ) {
497
518
for (byte [] columnName : entry .getValue ()) {
@@ -537,7 +558,7 @@ public Result call() throws IOException {
537
558
538
559
clientQueryStreamResult = (ObTableClientQueryAsyncStreamResult ) obTableClient
539
560
.execute (request );
540
- getKeyValueFromResult (clientQueryStreamResult , keyValueList , true , family );
561
+ getMaxRowFromResult (clientQueryStreamResult , keyValueList , true , family );
541
562
} else {
542
563
for (Map .Entry <byte [], NavigableSet <byte []>> entry : get .getFamilyMap ()
543
564
.entrySet ()) {
@@ -548,7 +569,7 @@ public Result call() throws IOException {
548
569
configuration ));
549
570
clientQueryStreamResult = (ObTableClientQueryAsyncStreamResult ) obTableClient
550
571
.execute (request );
551
- getKeyValueFromResult (clientQueryStreamResult , keyValueList , false ,
572
+ getMaxRowFromResult (clientQueryStreamResult , keyValueList , false ,
552
573
family );
553
574
}
554
575
}
@@ -558,6 +579,9 @@ public Result call() throws IOException {
558
579
throw new IOException ("query table:" + tableNameString + " family "
559
580
+ Bytes .toString (family ) + " error." , e );
560
581
}
582
+ if (get .isCheckExistenceOnly ()) {
583
+ return Result .create (null , !keyValueList .isEmpty ());
584
+ }
561
585
return new Result (keyValueList );
562
586
}
563
587
};
@@ -1110,7 +1134,7 @@ public void flushCommits() throws IOException {
1110
1134
// Bypass logic: directly construct BatchOperation for puts with family map size > 1
1111
1135
try {
1112
1136
BatchOperation batch = buildBatchOperation (this .tableNameString ,
1113
- innerFamilyMap , false , null );
1137
+ innerFamilyMap , false , null );
1114
1138
BatchOperationResult results = batch .execute ();
1115
1139
1116
1140
boolean hasError = results .hasError ();
@@ -1120,20 +1144,20 @@ public void flushCommits() throws IOException {
1120
1144
}
1121
1145
} catch (Exception e ) {
1122
1146
logger .error (LCD .convert ("01-00008" ), tableNameString , null , autoFlush ,
1123
- writeBuffer .size (), e );
1147
+ writeBuffer .size (), e );
1124
1148
throw new IOException ("put table " + tableNameString + " error codes "
1125
- + null + "auto flush " + autoFlush
1126
- + " current buffer size " + writeBuffer .size (), e );
1149
+ + null + "auto flush " + autoFlush
1150
+ + " current buffer size " + writeBuffer .size (), e );
1127
1151
}
1128
1152
} else {
1129
1153
// Existing logic for puts with family map size = 1
1130
1154
for (Map .Entry <byte [], List <KeyValue >> entry : innerFamilyMap .entrySet ()) {
1131
1155
String family = Bytes .toString (entry .getKey ());
1132
1156
Pair <List <Integer >, List <KeyValue >> keyValueWithIndex = familyMap
1133
- .get (family );
1157
+ .get (family );
1134
1158
if (keyValueWithIndex == null ) {
1135
1159
keyValueWithIndex = new Pair <List <Integer >, List <KeyValue >>(
1136
- new ArrayList <Integer >(), new ArrayList <KeyValue >());
1160
+ new ArrayList <Integer >(), new ArrayList <KeyValue >());
1137
1161
familyMap .put (family , keyValueWithIndex );
1138
1162
}
1139
1163
keyValueWithIndex .getFirst ().add (i );
@@ -1414,7 +1438,8 @@ private static String getTestLoadTargetTableName(String tableNameString, String
1414
1438
}
1415
1439
1416
1440
private ObHTableFilter buildObHTableFilter (Filter filter , TimeRange timeRange , int maxVersion ,
1417
- Collection <byte []> columnQualifiers ) throws IOException {
1441
+ Collection <byte []> columnQualifiers )
1442
+ throws IOException {
1418
1443
ObHTableFilter obHTableFilter = new ObHTableFilter ();
1419
1444
1420
1445
if (filter != null ) {
@@ -1440,7 +1465,9 @@ private ObHTableFilter buildObHTableFilter(Filter filter, TimeRange timeRange, i
1440
1465
return obHTableFilter ;
1441
1466
}
1442
1467
1443
- private byte [] buildCheckAndMutateFilterString (byte [] family , byte [] qualifier , CompareFilter .CompareOp compareOp , byte [] value ) throws IOException {
1468
+ private byte [] buildCheckAndMutateFilterString (byte [] family , byte [] qualifier ,
1469
+ CompareFilter .CompareOp compareOp , byte [] value )
1470
+ throws IOException {
1444
1471
ByteArrayOutputStream byteStream = new ByteArrayOutputStream ();
1445
1472
byteStream .write ("CheckAndMutateFilter(" .getBytes ());
1446
1473
byteStream .write (HBaseFilterUtils .toParseableByteArray (compareOp ));
@@ -1545,7 +1572,8 @@ private ObTableQuery buildObTableQuery(ObHTableFilter filter, final Scan scan) {
1545
1572
return obTableQuery ;
1546
1573
}
1547
1574
1548
- private ObTableQuery buildObTableQuery (final Get get , Collection <byte []> columnQualifiers ) throws IOException {
1575
+ private ObTableQuery buildObTableQuery (final Get get , Collection <byte []> columnQualifiers )
1576
+ throws IOException {
1549
1577
ObTableQuery obTableQuery ;
1550
1578
if (get .isClosestRowBefore ()) {
1551
1579
PageFilter pageFilter = new PageFilter (1 );
@@ -1593,11 +1621,11 @@ private ObTableBatchOperation buildObTableBatchOperation(Map<byte[], List<KeyVal
1593
1621
for (KeyValue kv : keyValueList ) {
1594
1622
if (qualifiers != null ) {
1595
1623
qualifiers
1596
- .add ((Bytes .toString (family ) + "." + Bytes .toString (kv .getQualifier ()))
1597
- .getBytes ());
1624
+ .add ((Bytes .toString (family ) + "." + Bytes .toString (kv .getQualifier ()))
1625
+ .getBytes ());
1598
1626
}
1599
1627
KeyValue new_kv = modifyQualifier (kv ,
1600
- (Bytes .toString (family ) + "." + Bytes .toString (kv .getQualifier ())).getBytes ());
1628
+ (Bytes .toString (family ) + "." + Bytes .toString (kv .getQualifier ())).getBytes ());
1601
1629
batch .addTableOperation (buildObTableOperation (new_kv , putToAppend ));
1602
1630
}
1603
1631
}
@@ -1636,6 +1664,7 @@ private com.alipay.oceanbase.rpc.mutation.Mutation buildMutation(KeyValue kv,
1636
1664
throw new IllegalArgumentException ("illegal mutation type " + kvType );
1637
1665
}
1638
1666
}
1667
+
1639
1668
private KeyValue modifyQualifier (KeyValue original , byte [] newQualifier ) {
1640
1669
// Extract existing components
1641
1670
byte [] row = original .getRow ();
@@ -1645,7 +1674,7 @@ private KeyValue modifyQualifier(KeyValue original, byte[] newQualifier) {
1645
1674
byte type = original .getTypeByte ();
1646
1675
// Create a new KeyValue with the modified qualifier
1647
1676
return new KeyValue (row , family , newQualifier , timestamp , KeyValue .Type .codeToType (type ),
1648
- value );
1677
+ value );
1649
1678
}
1650
1679
1651
1680
private BatchOperation buildBatchOperation (String tableName ,
@@ -1661,7 +1690,7 @@ private BatchOperation buildBatchOperation(String tableName,
1661
1690
qualifiers .add (kv .getQualifier ());
1662
1691
}
1663
1692
KeyValue new_kv = modifyQualifier (kv ,
1664
- (Bytes .toString (family ) + "." + Bytes .toString (kv .getQualifier ())).getBytes ());
1693
+ (Bytes .toString (family ) + "." + Bytes .toString (kv .getQualifier ())).getBytes ());
1665
1694
batch .addOperation (buildMutation (new_kv , putToAppend ));
1666
1695
}
1667
1696
}
@@ -1670,7 +1699,6 @@ private BatchOperation buildBatchOperation(String tableName,
1670
1699
return batch ;
1671
1700
}
1672
1701
1673
-
1674
1702
private BatchOperation buildBatchOperation (String tableName , List <KeyValue > keyValueList ,
1675
1703
boolean putToAppend , List <byte []> qualifiers ) {
1676
1704
BatchOperation batch = obTableClient .batchOperation (tableName );
@@ -1771,7 +1799,6 @@ public static void checkFamilyViolation(Collection<byte[]> families, boolean che
1771
1799
}
1772
1800
}
1773
1801
1774
-
1775
1802
// This method is currently only used for append and increment operations.
1776
1803
// It restricts these two methods to use multi-column family operations.
1777
1804
// Note: After completing operations on multiple column families, they are deleted using the method described above.
0 commit comments