Skip to content

Commit 77fbe89

Browse files
authored
Merge branch 'master' into 1pc-panic
2 parents 517bdb1 + 0535be6 commit 77fbe89

File tree

4 files changed

+108
-65
lines changed

4 files changed

+108
-65
lines changed

.travis.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,8 @@ script:
4848
# For now we only run full integration tests on Linux. Here's why:
4949
# * Docker on OS X is not supported by Travis.
5050
# * Docker on Windows seems to not have the correct binary at `"/c/Program Files/Docker/Docker/DockerCli.exe" to switch it to Linux containers.
51-
- if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then docker run -d --net=host --name pd --rm pingcap/pd:nightly --name "pd" --data-dir "pd" --client-urls "http://127.0.0.1:2379" --advertise-client-urls "http://127.0.0.1:2379"; fi
52-
- if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then docker run -d --net=host --name kv --rm --ulimit nofile=90000:90000 pingcap/tikv:nightly --pd-endpoints "127.0.0.1:2379" --addr "127.0.0.1:2378" --data-dir "kv"; fi
51+
- if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then docker run -d --net=host --name pd --rm pingcap/pd --name "pd" --data-dir "pd" --client-urls "http://127.0.0.1:2379" --advertise-client-urls "http://127.0.0.1:2379"; fi
52+
- if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then docker run -d --net=host --name kv --rm --ulimit nofile=90000:90000 pingcap/tikv --pd-endpoints "127.0.0.1:2379" --addr "127.0.0.1:2378" --data-dir "kv"; fi
5353
- if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then docker ps; fi
5454
- if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then docker logs pd; fi
5555
- if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then docker logs kv; fi

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ This crate provides an easy-to-use client for [TiKV](https://github.com/tikv/tik
88

99
This crate lets you connect to a TiKV cluster and use either a transactional or raw (simple get/put style without transactional consistency guarantees) API to access and update your data.
1010

11-
This is an open source (Apache 2) project maintained by the TiKV Authors. We welcome community contributions, see below for more info.
11+
The TiKV Rust client is an open source (Apache 2) project maintained by the TiKV Authors. We welcome contributions, see below for more info.
1212

1313
## Getting started
1414

@@ -19,7 +19,7 @@ The TiKV client is a Rust library (crate). To use this crate in your project, ad
1919
tikv-client = 0.1
2020
```
2121

22-
Note, that you will need `tikv-client = { git = "https://github.com/tikv/client-rust.git" }` until we publish the crate (should be any day now).
22+
Note, you will need `tikv-client = { git = "https://github.com/tikv/client-rust.git" }` until we publish the crate (should be any day now).
2323

2424
The minimum supported version of Rust is 1.40. The minimum supported version of TiKV is 5.0.
2525

src/transaction/buffer.rs

Lines changed: 92 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -24,19 +24,19 @@ impl Buffer {
2424
}
2525

2626
/// Get the primary key of the buffer.
27-
pub async fn get_primary_key(&self) -> Option<Key> {
27+
pub fn get_primary_key(&self) -> Option<Key> {
2828
self.primary_key.clone()
2929
}
3030

3131
/// Set the primary key if it is not set
32-
pub async fn primary_key_or(&mut self, key: &Key) {
32+
pub fn primary_key_or(&mut self, key: &Key) {
3333
self.primary_key.get_or_insert_with(|| key.clone());
3434
}
3535

3636
/// Get a value from the buffer.
3737
/// If the returned value is None, it means the key doesn't exist in buffer yet.
38-
pub async fn get(&self, key: &Key) -> Option<Value> {
39-
match self.get_from_mutations(key).await {
38+
pub fn get(&self, key: &Key) -> Option<Value> {
39+
match self.get_from_mutations(key) {
4040
MutationValue::Determined(value) => value,
4141
MutationValue::Undetermined => None,
4242
}
@@ -49,7 +49,7 @@ impl Buffer {
4949
F: FnOnce(Key) -> Fut,
5050
Fut: Future<Output = Result<Option<Value>>>,
5151
{
52-
match self.get_from_mutations(&key).await {
52+
match self.get_from_mutations(&key) {
5353
MutationValue::Determined(value) => Ok(value),
5454
MutationValue::Undetermined => {
5555
let value = f(key.clone()).await?;
@@ -161,7 +161,7 @@ impl Buffer {
161161
}
162162

163163
/// Lock the given key if necessary.
164-
pub async fn lock(&mut self, key: Key) {
164+
pub fn lock(&mut self, key: Key) {
165165
self.primary_key.get_or_insert_with(|| key.clone());
166166
let value = self
167167
.entry_map
@@ -174,13 +174,22 @@ impl Buffer {
174174
}
175175
}
176176

177-
/// Insert a value into the buffer (does not write through).
178-
pub async fn put(&mut self, key: Key, value: Value) {
179-
self.insert_entry(key, BufferEntry::Put(value));
177+
/// Put a value into the buffer (does not write through).
178+
pub fn put(&mut self, key: Key, value: Value) {
179+
let mut entry = self.entry_map.entry(key.clone());
180+
match entry {
181+
Entry::Occupied(ref mut o)
182+
if matches!(o.get(), BufferEntry::Insert(_))
183+
|| matches!(o.get(), BufferEntry::CheckNotExist) =>
184+
{
185+
o.insert(BufferEntry::Insert(value));
186+
}
187+
_ => self.insert_entry(key, BufferEntry::Put(value)),
188+
}
180189
}
181190

182191
/// Mark a value as Insert mutation into the buffer (does not write through).
183-
pub async fn insert(&mut self, key: Key, value: Value) {
192+
pub fn insert(&mut self, key: Key, value: Value) {
184193
let mut entry = self.entry_map.entry(key.clone());
185194
match entry {
186195
Entry::Occupied(ref mut o) if matches!(o.get(), BufferEntry::Del) => {
@@ -191,13 +200,15 @@ impl Buffer {
191200
}
192201

193202
/// Mark a value as deleted.
194-
pub async fn delete(&mut self, key: Key) {
203+
pub fn delete(&mut self, key: Key) {
195204
let is_pessimistic = self.is_pessimistic;
196205
let mut entry = self.entry_map.entry(key.clone());
197206

198207
match entry {
199208
Entry::Occupied(ref mut o)
200-
if matches!(o.get(), BufferEntry::Insert(_)) && !is_pessimistic =>
209+
if !is_pessimistic
210+
&& (matches!(o.get(), BufferEntry::Insert(_))
211+
|| matches!(o.get(), BufferEntry::CheckNotExist)) =>
201212
{
202213
o.insert(BufferEntry::CheckNotExist);
203214
}
@@ -206,14 +217,14 @@ impl Buffer {
206217
}
207218

208219
/// Converts the buffered mutations to the proto buffer version
209-
pub async fn to_proto_mutations(&self) -> Vec<kvrpcpb::Mutation> {
220+
pub fn to_proto_mutations(&self) -> Vec<kvrpcpb::Mutation> {
210221
self.entry_map
211222
.iter()
212223
.filter_map(|(key, mutation)| mutation.to_proto_with_key(key))
213224
.collect()
214225
}
215226

216-
async fn get_from_mutations(&self, key: &Key) -> MutationValue {
227+
fn get_from_mutations(&self, key: &Key) -> MutationValue {
217228
self.entry_map
218229
.get(&key)
219230
.map(BufferEntry::get_value)
@@ -352,26 +363,26 @@ impl MutationValue {
352363
mod tests {
353364
use super::*;
354365
use futures::{executor::block_on, future::ready};
366+
use tikv_client_common::internal_err;
355367

356-
#[tokio::test]
357-
#[allow(unreachable_code)]
358-
async fn set_and_get_from_buffer() {
368+
#[test]
369+
fn set_and_get_from_buffer() {
359370
let mut buffer = Buffer::new(false);
360-
buffer
361-
.put(b"key1".to_vec().into(), b"value1".to_vec())
362-
.await;
363-
buffer
364-
.put(b"key2".to_vec().into(), b"value2".to_vec())
365-
.await;
371+
buffer.put(b"key1".to_vec().into(), b"value1".to_vec());
372+
buffer.put(b"key2".to_vec().into(), b"value2".to_vec());
366373
assert_eq!(
367-
block_on(buffer.get_or_else(b"key1".to_vec().into(), move |_| ready(panic!())))
368-
.unwrap()
369-
.unwrap(),
374+
block_on(
375+
buffer.get_or_else(b"key1".to_vec().into(), move |_| ready(Err(internal_err!(
376+
""
377+
))))
378+
)
379+
.unwrap()
380+
.unwrap(),
370381
b"value1".to_vec()
371382
);
372383

373-
buffer.delete(b"key2".to_vec().into()).await;
374-
buffer.put(b"key1".to_vec().into(), b"value".to_vec()).await;
384+
buffer.delete(b"key2".to_vec().into());
385+
buffer.put(b"key1".to_vec().into(), b"value".to_vec());
375386
assert_eq!(
376387
block_on(buffer.batch_get_or_else(
377388
vec![b"key2".to_vec().into(), b"key1".to_vec().into()].into_iter(),
@@ -386,27 +397,24 @@ mod tests {
386397
);
387398
}
388399

389-
#[tokio::test]
390-
#[allow(unreachable_code)]
391-
async fn insert_and_get_from_buffer() {
400+
#[test]
401+
fn insert_and_get_from_buffer() {
392402
let mut buffer = Buffer::new(false);
393-
buffer
394-
.insert(b"key1".to_vec().into(), b"value1".to_vec())
395-
.await;
396-
buffer
397-
.insert(b"key2".to_vec().into(), b"value2".to_vec())
398-
.await;
403+
buffer.insert(b"key1".to_vec().into(), b"value1".to_vec());
404+
buffer.insert(b"key2".to_vec().into(), b"value2".to_vec());
399405
assert_eq!(
400-
block_on(buffer.get_or_else(b"key1".to_vec().into(), move |_| ready(panic!())))
401-
.unwrap()
402-
.unwrap(),
406+
block_on(
407+
buffer.get_or_else(b"key1".to_vec().into(), move |_| ready(Err(internal_err!(
408+
""
409+
))))
410+
)
411+
.unwrap()
412+
.unwrap(),
403413
b"value1".to_vec()
404414
);
405415

406-
buffer.delete(b"key2".to_vec().into()).await;
407-
buffer
408-
.insert(b"key1".to_vec().into(), b"value".to_vec())
409-
.await;
416+
buffer.delete(b"key2".to_vec().into());
417+
buffer.insert(b"key1".to_vec().into(), b"value".to_vec());
410418
assert_eq!(
411419
block_on(buffer.batch_get_or_else(
412420
vec![b"key2".to_vec().into(), b"key1".to_vec().into()].into_iter(),
@@ -419,7 +427,6 @@ mod tests {
419427
}
420428

421429
#[test]
422-
#[allow(unreachable_code)]
423430
fn repeat_reads_are_cached() {
424431
let k1: Key = b"key1".to_vec().into();
425432
let k1_ = k1.clone();
@@ -433,7 +440,7 @@ mod tests {
433440

434441
let mut buffer = Buffer::new(false);
435442
let r1 = block_on(buffer.get_or_else(k1.clone(), move |_| ready(Ok(Some(v1_)))));
436-
let r2 = block_on(buffer.get_or_else(k1.clone(), move |_| ready(panic!())));
443+
let r2 = block_on(buffer.get_or_else(k1.clone(), move |_| ready(Err(internal_err!("")))));
437444
assert_eq!(r1.unwrap().unwrap(), v1);
438445
assert_eq!(r2.unwrap().unwrap(), v1);
439446

@@ -443,7 +450,7 @@ mod tests {
443450
ready(Ok(vec![(k1_, v1__).into(), (k2_, v2_).into()]))
444451
}),
445452
);
446-
let r2 = block_on(buffer.get_or_else(k2.clone(), move |_| ready(panic!())));
453+
let r2 = block_on(buffer.get_or_else(k2.clone(), move |_| ready(Err(internal_err!("")))));
447454
let r3 = block_on(
448455
buffer.batch_get_or_else(vec![k1.clone(), k2.clone()].into_iter(), move |_| {
449456
ready(Ok(vec![]))
@@ -462,4 +469,42 @@ mod tests {
462469
vec![KvPair(k1, v1), KvPair(k2, v2)]
463470
);
464471
}
472+
473+
// Check that multiple writes to the same key combine in the correct way.
474+
#[test]
475+
fn state_machine() {
476+
let mut buffer = Buffer::new(false);
477+
478+
macro_rules! assert_entry {
479+
($key: ident, $p: pat) => {
480+
assert!(matches!(buffer.entry_map.get(&$key), Some(&$p),))
481+
};
482+
}
483+
484+
// Insert + Delete = CheckNotExists
485+
let key: Key = b"key1".to_vec().into();
486+
buffer.insert(key.clone(), b"value1".to_vec());
487+
buffer.delete(key.clone());
488+
assert_entry!(key, BufferEntry::CheckNotExist);
489+
490+
// CheckNotExists + Delete = CheckNotExists
491+
buffer.delete(key.clone());
492+
assert_entry!(key, BufferEntry::CheckNotExist);
493+
494+
// CheckNotExists + Put = Insert
495+
buffer.put(key.clone(), b"value2".to_vec());
496+
assert_entry!(key, BufferEntry::Insert(_));
497+
498+
// Insert + Put = Insert
499+
let key: Key = b"key2".to_vec().into();
500+
buffer.insert(key.clone(), b"value1".to_vec());
501+
buffer.put(key.clone(), b"value2".to_vec());
502+
assert_entry!(key, BufferEntry::Insert(_));
503+
504+
// Delete + Insert = Put
505+
let key: Key = b"key3".to_vec().into();
506+
buffer.delete(key.clone());
507+
buffer.insert(key.clone(), b"value1".to_vec());
508+
assert_entry!(key, BufferEntry::Put(_));
509+
}
465510
}

src/transaction/transaction.rs

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -398,7 +398,7 @@ impl<PdC: PdClient> Transaction<PdC> {
398398
self.pessimistic_lock(iter::once(key.clone()), false)
399399
.await?;
400400
}
401-
self.buffer.put(key, value.into()).await;
401+
self.buffer.put(key, value.into());
402402
Ok(())
403403
}
404404

@@ -424,7 +424,7 @@ impl<PdC: PdClient> Transaction<PdC> {
424424
pub async fn insert(&mut self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
425425
self.check_allow_operation().await?;
426426
let key = key.into();
427-
if self.buffer.get(&key).await.is_some() {
427+
if self.buffer.get(&key).is_some() {
428428
return Err(Error::DuplicateKeyInsertion);
429429
}
430430
if self.is_pessimistic() {
@@ -434,7 +434,7 @@ impl<PdC: PdClient> Transaction<PdC> {
434434
)
435435
.await?;
436436
}
437-
self.buffer.insert(key, value.into()).await;
437+
self.buffer.insert(key, value.into());
438438
Ok(())
439439
}
440440

@@ -462,7 +462,7 @@ impl<PdC: PdClient> Transaction<PdC> {
462462
self.pessimistic_lock(iter::once(key.clone()), false)
463463
.await?;
464464
}
465-
self.buffer.delete(key).await;
465+
self.buffer.delete(key);
466466
Ok(())
467467
}
468468

@@ -497,7 +497,7 @@ impl<PdC: PdClient> Transaction<PdC> {
497497
match self.options.kind {
498498
TransactionKind::Optimistic => {
499499
for key in keys {
500-
self.buffer.lock(key.into()).await;
500+
self.buffer.lock(key.into());
501501
}
502502
}
503503
TransactionKind::Pessimistic(_) => {
@@ -535,8 +535,8 @@ impl<PdC: PdClient> Transaction<PdC> {
535535
*status = TransactionStatus::StartedCommit;
536536
}
537537

538-
let primary_key = self.buffer.get_primary_key().await;
539-
let mutations = self.buffer.to_proto_mutations().await;
538+
let primary_key = self.buffer.get_primary_key();
539+
let mutations = self.buffer.to_proto_mutations();
540540
if mutations.is_empty() {
541541
assert!(primary_key.is_none());
542542
return Ok(None);
@@ -595,8 +595,8 @@ impl<PdC: PdClient> Transaction<PdC> {
595595
*status = TransactionStatus::StartedRollback;
596596
}
597597

598-
let primary_key = self.buffer.get_primary_key().await;
599-
let mutations = self.buffer.to_proto_mutations().await;
598+
let primary_key = self.buffer.get_primary_key();
599+
let mutations = self.buffer.to_proto_mutations();
600600
let res = Committer::new(
601601
primary_key,
602602
mutations,
@@ -619,7 +619,7 @@ impl<PdC: PdClient> Transaction<PdC> {
619619
/// Returns the TTL set on the transaction's locks by TiKV.
620620
pub async fn send_heart_beat(&mut self) -> Result<u64> {
621621
self.check_allow_operation().await?;
622-
let primary_key = match self.buffer.get_primary_key().await {
622+
let primary_key = match self.buffer.get_primary_key() {
623623
Some(k) => k,
624624
None => return Err(Error::NoPrimaryKey),
625625
};
@@ -695,7 +695,6 @@ impl<PdC: PdClient> Transaction<PdC> {
695695
let primary_lock = self
696696
.buffer
697697
.get_primary_key()
698-
.await
699698
.unwrap_or_else(|| first_key.clone());
700699
let for_update_ts = self.rpc.clone().get_timestamp().await?;
701700
self.options.push_for_update_ts(for_update_ts.clone());
@@ -717,12 +716,12 @@ impl<PdC: PdClient> Transaction<PdC> {
717716
let pairs = plan.execute().await;
718717

719718
// primary key will be set here if needed
720-
self.buffer.primary_key_or(&first_key).await;
719+
self.buffer.primary_key_or(&first_key);
721720

722721
self.start_auto_heartbeat().await;
723722

724723
for key in keys {
725-
self.buffer.lock(key.key()).await;
724+
self.buffer.lock(key.key());
726725
}
727726

728727
pairs
@@ -755,7 +754,6 @@ impl<PdC: PdClient> Transaction<PdC> {
755754
let primary_key = self
756755
.buffer
757756
.get_primary_key()
758-
.await
759757
.expect("Primary key should exist");
760758
let start_ts = self.timestamp.clone();
761759
let region_backoff = self.options.retry_options.region_backoff.clone();

0 commit comments

Comments
 (0)