@@ -174,9 +174,18 @@ impl Buffer {
174
174
}
175
175
}
176
176
177
- /// Insert a value into the buffer (does not write through).
177
+ /// Put a value into the buffer (does not write through).
178
178
pub async fn put ( & mut self , key : Key , value : Value ) {
179
- self . insert_entry ( key, BufferEntry :: Put ( value) ) ;
179
+ let mut entry = self . entry_map . entry ( key. clone ( ) ) ;
180
+ match entry {
181
+ Entry :: Occupied ( ref mut o)
182
+ if matches ! ( o. get( ) , BufferEntry :: Insert ( _) )
183
+ || matches ! ( o. get( ) , BufferEntry :: CheckNotExist ) =>
184
+ {
185
+ o. insert ( BufferEntry :: Insert ( value) ) ;
186
+ }
187
+ _ => self . insert_entry ( key, BufferEntry :: Put ( value) ) ,
188
+ }
180
189
}
181
190
182
191
/// Mark a value as Insert mutation into the buffer (does not write through).
@@ -197,7 +206,9 @@ impl Buffer {
197
206
198
207
match entry {
199
208
Entry :: Occupied ( ref mut o)
200
- if matches ! ( o. get( ) , BufferEntry :: Insert ( _) ) && !is_pessimistic =>
209
+ if !is_pessimistic
210
+ && ( matches ! ( o. get( ) , BufferEntry :: Insert ( _) )
211
+ || matches ! ( o. get( ) , BufferEntry :: CheckNotExist ) ) =>
201
212
{
202
213
o. insert ( BufferEntry :: CheckNotExist ) ;
203
214
}
@@ -352,9 +363,9 @@ impl MutationValue {
352
363
mod tests {
353
364
use super :: * ;
354
365
use futures:: { executor:: block_on, future:: ready} ;
366
+ use tikv_client_common:: internal_err;
355
367
356
368
#[ tokio:: test]
357
- #[ allow( unreachable_code) ]
358
369
async fn set_and_get_from_buffer ( ) {
359
370
let mut buffer = Buffer :: new ( false ) ;
360
371
buffer
@@ -364,9 +375,13 @@ mod tests {
364
375
. put ( b"key2" . to_vec ( ) . into ( ) , b"value2" . to_vec ( ) )
365
376
. await ;
366
377
assert_eq ! (
367
- block_on( buffer. get_or_else( b"key1" . to_vec( ) . into( ) , move |_| ready( panic!( ) ) ) )
368
- . unwrap( )
369
- . unwrap( ) ,
378
+ block_on(
379
+ buffer. get_or_else( b"key1" . to_vec( ) . into( ) , move |_| ready( Err ( internal_err!(
380
+ ""
381
+ ) ) ) )
382
+ )
383
+ . unwrap( )
384
+ . unwrap( ) ,
370
385
b"value1" . to_vec( )
371
386
) ;
372
387
@@ -387,7 +402,6 @@ mod tests {
387
402
}
388
403
389
404
#[ tokio:: test]
390
- #[ allow( unreachable_code) ]
391
405
async fn insert_and_get_from_buffer ( ) {
392
406
let mut buffer = Buffer :: new ( false ) ;
393
407
buffer
@@ -397,9 +411,13 @@ mod tests {
397
411
. insert ( b"key2" . to_vec ( ) . into ( ) , b"value2" . to_vec ( ) )
398
412
. await ;
399
413
assert_eq ! (
400
- block_on( buffer. get_or_else( b"key1" . to_vec( ) . into( ) , move |_| ready( panic!( ) ) ) )
401
- . unwrap( )
402
- . unwrap( ) ,
414
+ block_on(
415
+ buffer. get_or_else( b"key1" . to_vec( ) . into( ) , move |_| ready( Err ( internal_err!(
416
+ ""
417
+ ) ) ) )
418
+ )
419
+ . unwrap( )
420
+ . unwrap( ) ,
403
421
b"value1" . to_vec( )
404
422
) ;
405
423
@@ -419,7 +437,6 @@ mod tests {
419
437
}
420
438
421
439
#[ test]
422
- #[ allow( unreachable_code) ]
423
440
fn repeat_reads_are_cached ( ) {
424
441
let k1: Key = b"key1" . to_vec ( ) . into ( ) ;
425
442
let k1_ = k1. clone ( ) ;
@@ -433,7 +450,7 @@ mod tests {
433
450
434
451
let mut buffer = Buffer :: new ( false ) ;
435
452
let r1 = block_on ( buffer. get_or_else ( k1. clone ( ) , move |_| ready ( Ok ( Some ( v1_) ) ) ) ) ;
436
- let r2 = block_on ( buffer. get_or_else ( k1. clone ( ) , move |_| ready ( panic ! ( ) ) ) ) ;
453
+ let r2 = block_on ( buffer. get_or_else ( k1. clone ( ) , move |_| ready ( Err ( internal_err ! ( "" ) ) ) ) ) ;
437
454
assert_eq ! ( r1. unwrap( ) . unwrap( ) , v1) ;
438
455
assert_eq ! ( r2. unwrap( ) . unwrap( ) , v1) ;
439
456
@@ -443,7 +460,7 @@ mod tests {
443
460
ready ( Ok ( vec ! [ ( k1_, v1__) . into( ) , ( k2_, v2_) . into( ) ] ) )
444
461
} ) ,
445
462
) ;
446
- let r2 = block_on ( buffer. get_or_else ( k2. clone ( ) , move |_| ready ( panic ! ( ) ) ) ) ;
463
+ let r2 = block_on ( buffer. get_or_else ( k2. clone ( ) , move |_| ready ( Err ( internal_err ! ( "" ) ) ) ) ) ;
447
464
let r3 = block_on (
448
465
buffer. batch_get_or_else ( vec ! [ k1. clone( ) , k2. clone( ) ] . into_iter ( ) , move |_| {
449
466
ready ( Ok ( vec ! [ ] ) )
@@ -462,4 +479,42 @@ mod tests {
462
479
vec![ KvPair ( k1, v1) , KvPair ( k2, v2) ]
463
480
) ;
464
481
}
482
+
483
+ // Check that multiple writes to the same key combine in the correct way.
484
+ #[ tokio:: test]
485
+ async fn state_machine ( ) {
486
+ let mut buffer = Buffer :: new ( false ) ;
487
+
488
+ macro_rules! assert_entry {
489
+ ( $key: ident, $p: pat) => {
490
+ assert!( matches!( buffer. entry_map. get( & $key) , Some ( & $p) , ) )
491
+ } ;
492
+ }
493
+
494
+ // Insert + Delete = CheckNotExists
495
+ let key: Key = b"key1" . to_vec ( ) . into ( ) ;
496
+ buffer. insert ( key. clone ( ) , b"value1" . to_vec ( ) ) . await ;
497
+ buffer. delete ( key. clone ( ) ) . await ;
498
+ assert_entry ! ( key, BufferEntry :: CheckNotExist ) ;
499
+
500
+ // CheckNotExists + Delete = CheckNotExists
501
+ buffer. delete ( key. clone ( ) ) . await ;
502
+ assert_entry ! ( key, BufferEntry :: CheckNotExist ) ;
503
+
504
+ // CheckNotExists + Put = Insert
505
+ buffer. put ( key. clone ( ) , b"value2" . to_vec ( ) ) . await ;
506
+ assert_entry ! ( key, BufferEntry :: Insert ( _) ) ;
507
+
508
+ // Insert + Put = Insert
509
+ let key: Key = b"key2" . to_vec ( ) . into ( ) ;
510
+ buffer. insert ( key. clone ( ) , b"value1" . to_vec ( ) ) . await ;
511
+ buffer. put ( key. clone ( ) , b"value2" . to_vec ( ) ) . await ;
512
+ assert_entry ! ( key, BufferEntry :: Insert ( _) ) ;
513
+
514
+ // Delete + Insert = Put
515
+ let key: Key = b"key3" . to_vec ( ) . into ( ) ;
516
+ buffer. delete ( key. clone ( ) ) . await ;
517
+ buffer. insert ( key. clone ( ) , b"value1" . to_vec ( ) ) . await ;
518
+ assert_entry ! ( key, BufferEntry :: Put ( _) ) ;
519
+ }
465
520
}
0 commit comments