@@ -154,7 +154,7 @@ public class OHTable implements Table {
154
154
/**
155
155
* the buffer of put request
156
156
*/
157
- private final ArrayList <Put > writeBuffer = new ArrayList <>();
157
+ private final ArrayList <Put > writeBuffer = new ArrayList <Put >();
158
158
/**
159
159
* when the put request reach the write buffer size the do put will
160
160
* flush commits automatically
@@ -1423,7 +1423,7 @@ private boolean checkAndMutation(byte[] row, byte[] family, byte[] qualifier,
1423
1423
"mutation family is not equal check family" );
1424
1424
}
1425
1425
byte [] filterString = buildCheckAndMutateFilterString (family , qualifier , compareOp , value );
1426
- ObHTableFilter filter = buildObHTableFilter (filterString , null , 1 , qualifier );
1426
+ ObHTableFilter filter = buildObHTableFilter (filterString , timeRange , 1 , qualifier );
1427
1427
ObTableQuery obTableQuery = buildObTableQuery (filter , row , true , row , true , false ,
1428
1428
new TimeRange ());
1429
1429
ObTableBatchOperation batch = buildObTableBatchOperation (mutations , null );
@@ -2004,69 +2004,98 @@ public static ObTableBatchOperation buildObTableBatchOperation(List<Mutation> ro
2004
2004
batch .setSamePropertiesNames (true );
2005
2005
return batch ;
2006
2006
}
2007
+
2007
2008
private QueryAndMutate buildDeleteQueryAndMutate (KeyValue kv ,
2008
2009
ObTableOperationType operationType ,
2009
- boolean isTableGroup , Long TTL ) {
2010
- KeyValue .Type kvType = KeyValue .Type .codeToType (kv .getType ());
2011
- com .alipay .oceanbase .rpc .mutation .Mutation tableMutation = buildMutation (kv , operationType , isTableGroup , TTL );
2010
+ boolean isTableGroup , byte [] family , Long TTL ) {
2011
+ KeyValue .Type kvType = KeyValue .Type .codeToType (kv .getType ().getCode ());
2012
+ com .alipay .oceanbase .rpc .mutation .Mutation tableMutation = buildMutation (kv , operationType ,
2013
+ isTableGroup , family , TTL );
2012
2014
ObNewRange range = new ObNewRange ();
2013
2015
ObTableQuery tableQuery = new ObTableQuery ();
2014
- tableQuery .setObKVParams (buildOBKVParams ((Scan )null ));
2016
+ tableQuery .setObKVParams (buildOBKVParams ((Scan ) null ));
2015
2017
ObHTableFilter filter = null ;
2016
2018
switch (kvType ) {
2017
2019
case Delete :
2018
2020
if (kv .getTimestamp () == Long .MAX_VALUE ) {
2019
- range .setStartKey (ObRowKey .getInstance (kv .getRow (), ObObj .getMin (), ObObj .getMin ()));
2020
- range .setEndKey (ObRowKey .getInstance (kv .getRow (), ObObj .getMax (), ObObj .getMax ()));
2021
- filter = buildObHTableFilter (null , null , 1 , kv .getQualifier ());
2022
- } else {
2023
- range .setStartKey (ObRowKey .getInstance (kv .getRow (), ObObj .getMin (),ObObj .getMin ()));
2024
- range .setEndKey (ObRowKey .getInstance (kv .getRow (), ObObj .getMax (), ObObj .getMax ()));
2025
- filter = buildObHTableFilter (null , new TimeRange (kv .getTimestamp (), kv .getTimestamp () + 1 ), 1 , kv .getQualifier ());
2021
+ range .setStartKey (ObRowKey .getInstance (CellUtil .cloneRow (kv ), ObObj .getMin (),
2022
+ ObObj .getMin ()));
2023
+ range .setEndKey (ObRowKey .getInstance (CellUtil .cloneRow (kv ), ObObj .getMax (),
2024
+ ObObj .getMax ()));
2025
+ filter = buildObHTableFilter (null , null , 1 , CellUtil .cloneQualifier (kv ));
2026
+ } else {
2027
+ range .setStartKey (ObRowKey .getInstance (CellUtil .cloneRow (kv ), ObObj .getMin (),
2028
+ ObObj .getMin ()));
2029
+ range .setEndKey (ObRowKey .getInstance (CellUtil .cloneRow (kv ), ObObj .getMax (),
2030
+ ObObj .getMax ()));
2031
+ filter = buildObHTableFilter (null ,
2032
+ new TimeRange (kv .getTimestamp (), kv .getTimestamp () + 1 ), 1 ,
2033
+ CellUtil .cloneQualifier (kv ));
2026
2034
}
2027
2035
break ;
2028
2036
case DeleteColumn :
2029
2037
if (kv .getTimestamp () == Long .MAX_VALUE ) {
2030
- range .setStartKey (ObRowKey .getInstance (kv .getRow (), ObObj .getMin (), ObObj .getMin ()));
2031
- range .setEndKey (ObRowKey .getInstance (kv .getRow (), ObObj .getMax (), ObObj .getMax ()));
2032
- filter = buildObHTableFilter (null , null , Integer .MAX_VALUE , kv .getQualifier ());
2038
+ range .setStartKey (ObRowKey .getInstance (CellUtil .cloneRow (kv ), ObObj .getMin (),
2039
+ ObObj .getMin ()));
2040
+ range .setEndKey (ObRowKey .getInstance (CellUtil .cloneRow (kv ), ObObj .getMax (),
2041
+ ObObj .getMax ()));
2042
+ filter = buildObHTableFilter (null , null , Integer .MAX_VALUE ,
2043
+ CellUtil .cloneQualifier (kv ));
2033
2044
} else {
2034
- range .setStartKey (ObRowKey .getInstance (kv .getRow (), ObObj .getMin (), ObObj .getMin ()));
2035
- range .setEndKey (ObRowKey .getInstance (kv .getRow (), ObObj .getMax (), ObObj .getMax ()));
2036
- filter = buildObHTableFilter (null , new TimeRange (0 , kv .getTimestamp () + 1 ), Integer .MAX_VALUE , kv .getQualifier ());
2045
+ range .setStartKey (ObRowKey .getInstance (CellUtil .cloneRow (kv ), ObObj .getMin (),
2046
+ ObObj .getMin ()));
2047
+ range .setEndKey (ObRowKey .getInstance (CellUtil .cloneRow (kv ), ObObj .getMax (),
2048
+ ObObj .getMax ()));
2049
+ filter = buildObHTableFilter (null , new TimeRange (0 , kv .getTimestamp () + 1 ),
2050
+ Integer .MAX_VALUE , CellUtil .cloneQualifier (kv ));
2037
2051
}
2038
2052
break ;
2039
2053
case DeleteFamily :
2040
2054
if (kv .getTimestamp () == Long .MAX_VALUE ) {
2041
- range .setStartKey (ObRowKey .getInstance (kv .getRow (), ObObj .getMin (), ObObj .getMin ()));
2042
- range .setEndKey (ObRowKey .getInstance (kv .getRow (), ObObj .getMax (), ObObj .getMax ()));
2055
+ range .setStartKey (ObRowKey .getInstance (CellUtil .cloneRow (kv ), ObObj .getMin (),
2056
+ ObObj .getMin ()));
2057
+ range .setEndKey (ObRowKey .getInstance (CellUtil .cloneRow (kv ), ObObj .getMax (),
2058
+ ObObj .getMax ()));
2043
2059
if (!isTableGroup ) {
2044
2060
filter = buildObHTableFilter (null , null , Integer .MAX_VALUE );
2045
2061
} else {
2046
- filter = buildObHTableFilter (null , null , Integer .MAX_VALUE , kv .getQualifier ());
2062
+ filter = buildObHTableFilter (null , null , Integer .MAX_VALUE ,
2063
+ CellUtil .cloneQualifier (kv ));
2047
2064
}
2048
2065
} else {
2049
- range .setStartKey (ObRowKey .getInstance (kv .getRow (), ObObj .getMin (), ObObj .getMin ()));
2050
- range .setEndKey (ObRowKey .getInstance (kv .getRow (), ObObj .getMax (), ObObj .getMax ()));
2066
+ range .setStartKey (ObRowKey .getInstance (CellUtil .cloneRow (kv ), ObObj .getMin (),
2067
+ ObObj .getMin ()));
2068
+ range .setEndKey (ObRowKey .getInstance (CellUtil .cloneRow (kv ), ObObj .getMax (),
2069
+ ObObj .getMax ()));
2051
2070
if (!isTableGroup ) {
2052
- filter = buildObHTableFilter (null , new TimeRange (0 , kv .getTimestamp () + 1 ), Integer .MAX_VALUE );
2071
+ filter = buildObHTableFilter (null , new TimeRange (0 , kv .getTimestamp () + 1 ),
2072
+ Integer .MAX_VALUE );
2053
2073
} else {
2054
- filter = buildObHTableFilter (null , new TimeRange (0 , kv .getTimestamp () + 1 ), Integer .MAX_VALUE , kv .getQualifier ());
2074
+ filter = buildObHTableFilter (null , new TimeRange (0 , kv .getTimestamp () + 1 ),
2075
+ Integer .MAX_VALUE , CellUtil .cloneQualifier (kv ));
2055
2076
}
2056
2077
}
2057
2078
break ;
2058
2079
case DeleteFamilyVersion :
2059
2080
if (kv .getTimestamp () == Long .MAX_VALUE ) {
2060
- range .setStartKey (ObRowKey .getInstance (kv .getRow (), ObObj .getMin (), ObObj .getMin ()));
2061
- range .setEndKey (ObRowKey .getInstance (kv .getRow (), ObObj .getMax (), ObObj .getMax ()));
2081
+ range .setStartKey (ObRowKey .getInstance (CellUtil .cloneRow (kv ), ObObj .getMin (),
2082
+ ObObj .getMin ()));
2083
+ range .setEndKey (ObRowKey .getInstance (CellUtil .cloneRow (kv ), ObObj .getMax (),
2084
+ ObObj .getMax ()));
2062
2085
filter = buildObHTableFilter (null , null , Integer .MAX_VALUE );
2063
2086
} else {
2064
- range .setStartKey (ObRowKey .getInstance (kv .getRow (), ObObj .getMin (), ObObj .getMin ()));
2065
- range .setEndKey (ObRowKey .getInstance (kv .getRow (), ObObj .getMax (), ObObj .getMax ()));
2087
+ range .setStartKey (ObRowKey .getInstance (CellUtil .cloneRow (kv ), ObObj .getMin (),
2088
+ ObObj .getMin ()));
2089
+ range .setEndKey (ObRowKey .getInstance (CellUtil .cloneRow (kv ), ObObj .getMax (),
2090
+ ObObj .getMax ()));
2066
2091
if (!isTableGroup ) {
2067
- filter = buildObHTableFilter (null , new TimeRange (kv .getTimestamp (), kv .getTimestamp () + 1 ), Integer .MAX_VALUE );
2092
+ filter = buildObHTableFilter (null ,
2093
+ new TimeRange (kv .getTimestamp (), kv .getTimestamp () + 1 ),
2094
+ Integer .MAX_VALUE );
2068
2095
} else {
2069
- filter = buildObHTableFilter (null , new TimeRange (kv .getTimestamp (), kv .getTimestamp () + 1 ), Integer .MAX_VALUE , kv .getQualifier ());
2096
+ filter = buildObHTableFilter (null ,
2097
+ new TimeRange (kv .getTimestamp (), kv .getTimestamp () + 1 ),
2098
+ Integer .MAX_VALUE , CellUtil .cloneQualifier (kv ));
2070
2099
}
2071
2100
}
2072
2101
break ;
@@ -2078,7 +2107,6 @@ private QueryAndMutate buildDeleteQueryAndMutate(KeyValue kv,
2078
2107
return new QueryAndMutate (tableQuery , tableMutation );
2079
2108
}
2080
2109
2081
-
2082
2110
private com .alipay .oceanbase .rpc .mutation .Mutation buildMutation (Cell kv ,
2083
2111
ObTableOperationType operationType ,
2084
2112
boolean isTableGroup ,
@@ -2212,14 +2240,14 @@ private BatchOperation buildBatchOperation(String tableName, List<? extends Row>
2212
2240
singleOpResultNum ++;
2213
2241
if (disExec ) {
2214
2242
KeyValue kv = new KeyValue (delete .getRow (), delete .getTimeStamp (),
2215
- KeyValue .Type .Maximum );
2216
- com .alipay .oceanbase .rpc .mutation .Mutation tableMutation = buildMutation (kv , DEL , isTableGroup , Long .MAX_VALUE );
2243
+ KeyValue .Type .DeleteFamily );
2244
+ com .alipay .oceanbase .rpc .mutation .Mutation tableMutation = buildMutation (kv , DEL , isTableGroup , null , Long .MAX_VALUE );
2217
2245
ObNewRange range = new ObNewRange ();
2218
2246
ObTableQuery tableQuery = new ObTableQuery ();
2219
2247
ObHTableFilter filter ;
2220
2248
tableQuery .setObKVParams (buildOBKVParams ((Scan ) null ));
2221
- range .setStartKey (ObRowKey .getInstance (kv . getRow ( ), ObObj .getMin (), ObObj .getMin ()));
2222
- range .setEndKey (ObRowKey .getInstance (kv . getRow ( ), ObObj .getMax (), ObObj .getMax ()));
2249
+ range .setStartKey (ObRowKey .getInstance (CellUtil . cloneRow ( kv ), ObObj .getMin (), ObObj .getMin ()));
2250
+ range .setEndKey (ObRowKey .getInstance (CellUtil . cloneRow ( kv ), ObObj .getMax (), ObObj .getMax ()));
2223
2251
if (kv .getTimestamp () == Long .MAX_VALUE ) {
2224
2252
filter = buildObHTableFilter (null , null , Integer .MAX_VALUE );
2225
2253
} else {
@@ -2231,31 +2259,21 @@ private BatchOperation buildBatchOperation(String tableName, List<? extends Row>
2231
2259
tableMutation .setTable (tableName );
2232
2260
batch .addOperation (new QueryAndMutate (tableQuery , tableMutation ));
2233
2261
} else {
2234
- KeyValue kv = new KeyValue (delete .getRow (), delete .getTimeStamp (),
2235
- KeyValue .Type .Maximum );
2236
- batch .addOperation (buildMutation (kv , DEL , isTableGroup , Long .MAX_VALUE ));
2262
+ KeyValue kv = new KeyValue (delete .getRow (), delete .getTimeStamp ());
2263
+ batch .addOperation (com .alipay .oceanbase .rpc .mutation .Mutation .getInstance (DEL , ROW_KEY_COLUMNS ,
2264
+ new Object [] { CellUtil .cloneRow (kv ), null , -kv .getTimestamp () },
2265
+ null , null ));
2237
2266
}
2238
2267
} else {
2239
2268
for (Map .Entry <byte [], List <Cell >> entry : delete .getFamilyCellMap ().entrySet ()) {
2240
2269
byte [] family = entry .getKey ();
2241
2270
List <Cell > keyValueList = entry .getValue ();
2242
2271
for (Cell kv : keyValueList ) {
2243
2272
singleOpResultNum ++;
2244
- if (isTableGroup ) {
2245
- KeyValue new_kv = modifyQualifier (kv ,
2246
- (Bytes .toString (family ) + "." + Bytes .toString (kv
2247
- .getQualifier ())).getBytes ());
2248
- if (disExec ) {
2249
- batch .addOperation (buildDeleteQueryAndMutate (new_kv , DEL , true , Long .MAX_VALUE ));
2250
- } else {
2251
- batch .addOperation (buildMutation (new_kv , DEL , true , Long .MAX_VALUE ));
2252
- }
2273
+ if (disExec ) {
2274
+ batch .addOperation (buildDeleteQueryAndMutate ((KeyValue ) kv , DEL , false , family , Long .MAX_VALUE ));
2253
2275
} else {
2254
- if (disExec ) {
2255
- batch .addOperation (buildDeleteQueryAndMutate (kv , DEL , false , Long .MAX_VALUE ));
2256
- } else {
2257
- batch .addOperation (buildMutation (kv , DEL , false , Long .MAX_VALUE ));
2258
- }
2276
+ batch .addOperation (buildMutation (kv , DEL , isTableGroup , family , Long .MAX_VALUE ));
2259
2277
}
2260
2278
}
2261
2279
}
0 commit comments