File tree Expand file tree Collapse file tree 1 file changed +11
-1
lines changed Expand file tree Collapse file tree 1 file changed +11
-1
lines changed Original file line number Diff line number Diff line change @@ -15,7 +15,10 @@ use fail::fail_point;
15
15
use futures:: { prelude:: * , stream:: BoxStream } ;
16
16
use slog:: Logger ;
17
17
use std:: { iter, ops:: RangeBounds , sync:: Arc , time:: Instant } ;
18
- use tikv_client_proto:: { kvrpcpb, pdpb:: Timestamp } ;
18
+ use tikv_client_proto:: {
19
+ kvrpcpb:: { self , Op } ,
20
+ pdpb:: Timestamp ,
21
+ } ;
19
22
use tokio:: { sync:: RwLock , time:: Duration } ;
20
23
21
24
/// An undo-able set of actions on the dataset.
@@ -1055,6 +1058,11 @@ struct Committer<PdC: PdClient = PdRpcClient> {
1055
1058
}
1056
1059
1057
1060
impl < PdC : PdClient > Committer < PdC > {
1061
+ // CheckNotExists is checked in the prewrite phase and should not appear in the commit phase.
1062
+ fn filter_out_check_not_exists_mutations ( & mut self ) {
1063
+ self . mutations . retain ( |m| m. op ( ) != Op :: CheckNotExists )
1064
+ }
1065
+
1058
1066
async fn commit ( mut self ) -> Result < Option < Timestamp > > {
1059
1067
debug ! ( self . logger, "committing" ) ;
1060
1068
@@ -1067,6 +1075,8 @@ impl<PdC: PdClient> Committer<PdC> {
1067
1075
return Ok ( min_commit_ts) ;
1068
1076
}
1069
1077
1078
+ self . filter_out_check_not_exists_mutations ( ) ;
1079
+
1070
1080
let commit_ts = if self . options . async_commit {
1071
1081
// FIXME: min_commit_ts == 0 => fallback to normal 2PC
1072
1082
min_commit_ts. unwrap ( )
You can’t perform that action at this time.
0 commit comments