@@ -154,7 +154,7 @@ public class OHTable implements Table {
154
154
/**
155
155
* the buffer of put request
156
156
*/
157
- private final ArrayList <Put > writeBuffer = new ArrayList <Put >();
157
+ private final ArrayList <Put > writeBuffer = new ArrayList <Put >();
158
158
/**
159
159
* when the put request reach the write buffer size the do put will
160
160
* flush commits automatically
@@ -459,8 +459,8 @@ private void finishSetUp() {
459
459
}
460
460
461
461
public static OHConnectionConfiguration setUserDefinedNamespace (String tableNameString ,
462
- OHConnectionConfiguration ohConnectionConf )
463
- throws IllegalArgumentException {
462
+ OHConnectionConfiguration ohConnectionConf )
463
+ throws IllegalArgumentException {
464
464
if (tableNameString .indexOf (':' ) != -1 ) {
465
465
String [] params = tableNameString .split (":" );
466
466
if (params .length != 2 ) {
@@ -500,13 +500,15 @@ public Configuration getConfiguration() {
500
500
501
501
@ Override
502
502
public HTableDescriptor getTableDescriptor () throws IOException {
503
- OHTableDescriptorExecutor executor = new OHTableDescriptorExecutor (tableNameString , obTableClient );
503
+ OHTableDescriptorExecutor executor = new OHTableDescriptorExecutor (tableNameString ,
504
+ obTableClient );
504
505
return executor .getTableDescriptor ();
505
506
}
506
507
507
508
@ Override
508
509
public TableDescriptor getDescriptor () throws IOException {
509
- OHTableDescriptorExecutor executor = new OHTableDescriptorExecutor (tableNameString , obTableClient );
510
+ OHTableDescriptorExecutor executor = new OHTableDescriptorExecutor (tableNameString ,
511
+ obTableClient );
510
512
return executor .getTableDescriptor ();
511
513
}
512
514
@@ -620,8 +622,8 @@ private BatchOperation compatOldServerDel(final List<? extends Row> actions, fin
620
622
} else if (delete .getFamilyCellMap ().size () > 1 ) {
621
623
boolean has_delete_family = delete .getFamilyCellMap ().entrySet ().stream ()
622
624
.flatMap (entry -> entry .getValue ().stream ()).anyMatch (
623
- kv -> kv .getType (). getCode () == KeyValue .Type .DeleteFamily .getCode () ||
624
- kv .getType (). getCode () == KeyValue .Type .DeleteFamilyVersion .getCode ());
625
+ kv -> kv .getTypeByte () == KeyValue .Type .DeleteFamily .getCode () ||
626
+ kv .getTypeByte () == KeyValue .Type .DeleteFamilyVersion .getCode ());
625
627
if (!has_delete_family ) {
626
628
return buildBatchOperation (tableNameString ,
627
629
Collections .singletonList (delete ), true ,
@@ -909,10 +911,12 @@ private void processColumnFilters(NavigableSet<byte[]> columnFilters,
909
911
byte [] family = entry .getKey ();
910
912
if (entry .getValue () != null ) {
911
913
for (byte [] columnName : entry .getValue ()) {
912
- byte [] newQualifier = new byte [family .length + 1 /* length of "." */ + columnName .length ];
914
+ byte [] newQualifier = new byte [family .length + 1 /* length of "." */
915
+ + columnName .length ];
913
916
System .arraycopy (family , 0 , newQualifier , 0 , family .length );
914
917
newQualifier [family .length ] = 0x2E ; // 0x2E in utf-8 is "."
915
- System .arraycopy (columnName , 0 , newQualifier , family .length + 1 , columnName .length );
918
+ System .arraycopy (columnName , 0 , newQualifier , family .length + 1 ,
919
+ columnName .length );
916
920
columnFilters .add (newQualifier );
917
921
}
918
922
} else {
@@ -1402,8 +1406,8 @@ public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, Compa
1402
1406
}
1403
1407
1404
1408
@ Override
1405
- public CheckAndMutateBuilder checkAndMutate ( byte [] row , byte [] family ) {
1406
- return new ObCheckAndMutateBuilderImpl ( row , family );
1409
+ public long getRpcTimeout ( TimeUnit unit ) {
1410
+ return getRpcTimeout ( );
1407
1411
}
1408
1412
1409
1413
private boolean checkAndMutation (byte [] row , byte [] family , byte [] qualifier ,
@@ -1723,6 +1727,11 @@ public int getOperationTimeout() {
1723
1727
return operationTimeout ;
1724
1728
}
1725
1729
1730
+ @ Override
1731
+ public long getOperationTimeout (TimeUnit unit ) {
1732
+ return getOperationTimeout ();
1733
+ }
1734
+
1726
1735
//todo
1727
1736
@ Override
1728
1737
public void setRpcTimeout (int rpcTimeout ) {
@@ -1741,11 +1750,31 @@ public int getReadRpcTimeout() {
1741
1750
return this .readRpcTimeout ;
1742
1751
}
1743
1752
1753
+ @ Override
1754
+ public void setReadRpcTimeout (int readRpcTimeout ) {
1755
+ this .readRpcTimeout = readRpcTimeout ;
1756
+ }
1757
+
1758
+ @ Override
1759
+ public long getReadRpcTimeout (TimeUnit unit ) {
1760
+ return getReadRpcTimeout ();
1761
+ }
1762
+
1763
+ @ Override
1764
+ public long getWriteRpcTimeout (TimeUnit unit ) {
1765
+ return this .readRpcTimeout ;
1766
+ }
1767
+
1744
1768
@ Override
1745
1769
public int getWriteRpcTimeout () {
1746
1770
return this .writeRpcTimeout ;
1747
1771
}
1748
1772
1773
+ @ Override
1774
+ public void setWriteRpcTimeout (int writeRpcTimeout ) {
1775
+ this .writeRpcTimeout = writeRpcTimeout ;
1776
+ }
1777
+
1749
1778
public void setRuntimeBatchExecutor (ExecutorService runtimeBatchExecutor ) {
1750
1779
this .obTableClient .setRuntimeBatchExecutor (runtimeBatchExecutor );
1751
1780
}
@@ -1877,7 +1906,7 @@ private ObHTableFilter buildObHTableFilter(byte[] filterString, TimeRange timeRa
1877
1906
if (columnQualifier == null ) {
1878
1907
obHTableFilter .addSelectColumnQualifier (new byte [0 ]);
1879
1908
} else {
1880
- obHTableFilter .addSelectColumnQualifier (columnQualifier );
1909
+ obHTableFilter .addSelectColumnQualifier (columnQualifier );
1881
1910
}
1882
1911
}
1883
1912
}
@@ -1935,11 +1964,11 @@ private ObTableQuery buildObTableQuery(ObHTableFilter filter, final Scan scan) {
1935
1964
filter .setOffsetPerRowPerCf (scan .getRowOffsetPerColumnFamily ());
1936
1965
}
1937
1966
if (scan .isReversed ()) {
1938
- obTableQuery = buildObTableQuery (filter , scan .getStopRow (), scan .includeStopRow (), scan . getStartRow (),
1939
- scan .includeStartRow (), true , ts );
1967
+ obTableQuery = buildObTableQuery (filter , scan .getStopRow (), scan .includeStopRow (),
1968
+ scan . getStartRow (), scan .includeStartRow (), true , ts );
1940
1969
} else {
1941
- obTableQuery = buildObTableQuery (filter , scan .getStartRow (), scan .includeStartRow (), scan . getStopRow (),
1942
- scan .includeStopRow (), false , ts );
1970
+ obTableQuery = buildObTableQuery (filter , scan .getStartRow (), scan .includeStartRow (),
1971
+ scan . getStopRow (), scan .includeStopRow (), false , ts );
1943
1972
}
1944
1973
obTableQuery .setBatchSize (scan .getBatch ());
1945
1974
obTableQuery .setLimit (scan .getLimit ());
@@ -2016,13 +2045,14 @@ public static ObTableBatchOperation buildObTableBatchOperation(List<Mutation> ro
2016
2045
private QueryAndMutate buildDeleteQueryAndMutate (KeyValue kv ,
2017
2046
ObTableOperationType operationType ,
2018
2047
boolean isTableGroup , byte [] family , Long TTL ) {
2019
- KeyValue .Type kvType = KeyValue .Type .codeToType (kv .getType (). getCode ());
2048
+ KeyValue .Type kvType = KeyValue .Type .codeToType (kv .getTypeByte ());
2020
2049
com .alipay .oceanbase .rpc .mutation .Mutation tableMutation = buildMutation (kv , operationType ,
2021
2050
isTableGroup , family , TTL );
2022
- if (isTableGroup ) {
2051
+ if (isTableGroup ) {
2023
2052
// construct new_kv otherwise filter will fail to match targeted columns
2024
2053
byte [] oldQualifier = CellUtil .cloneQualifier (kv );
2025
- byte [] newQualifier = new byte [family .length + 1 /* length of "." */ + oldQualifier .length ];
2054
+ byte [] newQualifier = new byte [family .length + 1 /* length of "." */
2055
+ + oldQualifier .length ];
2026
2056
System .arraycopy (family , 0 , newQualifier , 0 , family .length );
2027
2057
newQualifier [family .length ] = 0x2E ; // 0x2E in utf-8 is "."
2028
2058
System .arraycopy (oldQualifier , 0 , newQualifier , family .length + 1 , oldQualifier .length );
@@ -2105,12 +2135,10 @@ private QueryAndMutate buildDeleteQueryAndMutate(KeyValue kv,
2105
2135
range .setEndKey (ObRowKey .getInstance (CellUtil .cloneRow (kv ), ObObj .getMax (),
2106
2136
ObObj .getMax ()));
2107
2137
if (!isTableGroup ) {
2108
- filter = buildObHTableFilter (null ,
2109
- new TimeRange (0 , kv .getTimestamp () + 1 ),
2138
+ filter = buildObHTableFilter (null , new TimeRange (0 , kv .getTimestamp () + 1 ),
2110
2139
Integer .MAX_VALUE );
2111
2140
} else {
2112
- filter = buildObHTableFilter (null ,
2113
- new TimeRange (0 , kv .getTimestamp () + 1 ),
2141
+ filter = buildObHTableFilter (null , new TimeRange (0 , kv .getTimestamp () + 1 ),
2114
2142
Integer .MAX_VALUE , CellUtil .cloneQualifier (kv ));
2115
2143
}
2116
2144
}
@@ -2130,13 +2158,14 @@ private com.alipay.oceanbase.rpc.mutation.Mutation buildMutation(Cell kv,
2130
2158
Cell newCell = kv ;
2131
2159
if (isTableGroup && family != null ) {
2132
2160
byte [] oldQualifier = CellUtil .cloneQualifier (kv );
2133
- byte [] newQualifier = new byte [family .length + 1 /* length of "." */ + oldQualifier .length ];
2161
+ byte [] newQualifier = new byte [family .length + 1 /* length of "." */
2162
+ + oldQualifier .length ];
2134
2163
System .arraycopy (family , 0 , newQualifier , 0 , family .length );
2135
2164
newQualifier [family .length ] = 0x2E ; // 0x2E in utf-8 is "."
2136
2165
System .arraycopy (oldQualifier , 0 , newQualifier , family .length + 1 , oldQualifier .length );
2137
2166
newCell = modifyQualifier (kv , newQualifier );
2138
2167
}
2139
- Cell .Type kvType = kv .getType ( );
2168
+ KeyValue .Type kvType = KeyValue . Type . codeToType ( kv .getTypeByte () );
2140
2169
switch (kvType ) {
2141
2170
case Put :
2142
2171
String [] propertyColumns = V_COLUMNS ;
@@ -2182,7 +2211,7 @@ private KeyValue modifyQualifier(Cell original, byte[] newQualifier) {
2182
2211
byte [] family = CellUtil .cloneFamily (original );
2183
2212
byte [] value = CellUtil .cloneValue (original );
2184
2213
long timestamp = original .getTimestamp ();
2185
- KeyValue .Type type = KeyValue .Type .codeToType (original .getType (). getCode ());
2214
+ KeyValue .Type type = KeyValue .Type .codeToType (original .getTypeByte ());
2186
2215
// Create a new KeyValue with the modified qualifier
2187
2216
return new KeyValue (row , family , newQualifier , timestamp , type , value );
2188
2217
}
@@ -2312,7 +2341,7 @@ private BatchOperation buildBatchOperation(String tableName, List<? extends Row>
2312
2341
public static ObTableOperation buildObTableOperation (Cell kv ,
2313
2342
ObTableOperationType operationType ,
2314
2343
Long TTL ) {
2315
- Cell .Type kvType = kv .getType ( );
2344
+ KeyValue .Type kvType = KeyValue . Type . codeToType ( kv .getTypeByte () );
2316
2345
String [] propertyColumns = V_COLUMNS ;
2317
2346
Object [] property = new Object [] { CellUtil .cloneValue (kv ) };
2318
2347
if (TTL != Long .MAX_VALUE ) {
@@ -2449,7 +2478,7 @@ public Pair<byte[][], byte[][]> getStartEndKeys() throws IOException {
2449
2478
return new Pair <>(getStartKeys (), getEndKeys ());
2450
2479
}
2451
2480
2452
- private CompareFilter .CompareOp getCompareOp (CompareOperator cmpOp ) {
2481
+ public static CompareFilter .CompareOp getCompareOp (CompareOperator cmpOp ) {
2453
2482
switch (cmpOp ) {
2454
2483
case LESS :
2455
2484
return CompareFilter .CompareOp .LESS ;
@@ -2467,96 +2496,4 @@ private CompareFilter.CompareOp getCompareOp(CompareOperator cmpOp) {
2467
2496
return CompareFilter .CompareOp .NO_OP ;
2468
2497
}
2469
2498
}
2470
-
2471
- private class ObCheckAndMutateBuilderImpl implements CheckAndMutateBuilder {
2472
- private final byte [] row ;
2473
- private final byte [] family ;
2474
- private byte [] qualifier ;
2475
- private byte [] value ;
2476
- private TimeRange timeRange ;
2477
- private CompareOperator cmpOp ;
2478
-
2479
- ObCheckAndMutateBuilderImpl (byte [] row , byte [] family ) {
2480
- this .row = checkNotNull (row , "The provided row is null." );
2481
- this .family = checkNotNull (family , "The provided family is null." );
2482
- }
2483
-
2484
- @ Override
2485
- public CheckAndMutateBuilder qualifier (byte [] qualifier ) {
2486
- this .qualifier = checkNotNull (
2487
- qualifier ,
2488
- "The provided qualifier is null. You could"
2489
- + " use an empty byte array, or do not call this method if you want a null qualifier." );
2490
- return this ;
2491
- }
2492
-
2493
- @ Override
2494
- public CheckAndMutateBuilder timeRange (TimeRange timeRange ) {
2495
- this .timeRange = timeRange ;
2496
- return this ;
2497
- }
2498
-
2499
- @ Override
2500
- public CheckAndMutateBuilder ifNotExists () {
2501
- this .cmpOp = CompareOperator .EQUAL ;
2502
- this .value = null ;
2503
- return this ;
2504
- }
2505
-
2506
- @ Override
2507
- public CheckAndMutateBuilder ifMatches (CompareOperator cmpOp , byte [] value ) {
2508
- this .cmpOp = checkNotNull (cmpOp , "The provided cmpOp is null." );
2509
- this .value = checkNotNull (value , "The provided value is null." );
2510
- return this ;
2511
- }
2512
-
2513
- @ Override
2514
- public boolean thenPut (Put put ) throws IOException {
2515
- checkCmpOp ();
2516
- RowMutations rowMutations = new RowMutations (row );
2517
- rowMutations .add (put );
2518
- try {
2519
- return checkAndMutation (row , family , qualifier , getCompareOp (cmpOp ), value ,
2520
- timeRange , rowMutations );
2521
- } catch (Exception e ) {
2522
- logger .error (LCD .convert ("01-00005" ), rowMutations , tableNameString , e );
2523
- throw new IOException ("checkAndMutate type table: " + tableNameString + " e.msg: "
2524
- + e .getMessage () + " error." , e );
2525
- }
2526
- }
2527
-
2528
- @ Override
2529
- public boolean thenDelete (Delete delete ) throws IOException {
2530
- checkCmpOp ();
2531
- RowMutations rowMutations = new RowMutations (row );
2532
- rowMutations .add (delete );
2533
- try {
2534
- return checkAndMutation (row , family , qualifier , getCompareOp (cmpOp ), value ,
2535
- timeRange , rowMutations );
2536
- } catch (Exception e ) {
2537
- logger .error (LCD .convert ("01-00005" ), rowMutations , tableNameString , e );
2538
- throw new IOException ("checkAndMutate type table: " + tableNameString + " e.msg: "
2539
- + e .getMessage () + " error." , e );
2540
- }
2541
- }
2542
-
2543
- @ Override
2544
- public boolean thenMutate (RowMutations mutation ) throws IOException {
2545
- checkCmpOp ();
2546
- try {
2547
- return checkAndMutation (row , family , qualifier , getCompareOp (cmpOp ), value ,
2548
- timeRange , mutation );
2549
- } catch (Exception e ) {
2550
- logger .error (LCD .convert ("01-00005" ), mutation , tableNameString , e );
2551
- throw new IOException ("checkAndMutate type table: " + tableNameString + " e.msg: "
2552
- + e .getMessage () + " error." , e );
2553
- }
2554
- }
2555
-
2556
- private void checkCmpOp () {
2557
- checkNotNull (this .cmpOp ,
2558
- "The compare condition is null. Please use"
2559
- + " ifNotExists/ifEquals/ifMatches before executing the request" );
2560
- }
2561
- }
2562
- }
2499
+ }
0 commit comments