Skip to content

Commit bace9d9

Browse files
committed
De-async buffer
Signed-off-by: Nick Cameron <nrc@ncameron.org>
1 parent b2ac690 commit bace9d9

File tree

3 files changed

+47
-59
lines changed

3 files changed

+47
-59
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ This crate provides an easy-to-use client for [TiKV](https://github.com/tikv/tik
88

99
This crate lets you connect to a TiKV cluster and use either a transactional or raw (simple get/put style without transactional consistency guarantees) API to access and update your data.
1010

11-
This is an open source (Apache 2) project maintained by the TiKV Authors. We welcome community contributions, see below for more info.
11+
The TiKV Rust client is an open source (Apache 2) project maintained by the TiKV Authors. We welcome contributions, see below for more info.
1212

1313
## Getting started
1414

@@ -19,7 +19,7 @@ The TiKV client is a Rust library (crate). To use this crate in your project, ad
1919
tikv-client = 0.1
2020
```
2121

22-
Note, that you will need `tikv-client = { git = "https://github.com/tikv/client-rust.git" }` until we publish the crate (should be any day now).
22+
Note, you will need `tikv-client = { git = "https://github.com/tikv/client-rust.git" }` until we publish the crate (should be any day now).
2323

2424
The minimum supported version of Rust is 1.40. The minimum supported version of TiKV is 5.0.
2525

src/transaction/buffer.rs

Lines changed: 33 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -24,19 +24,19 @@ impl Buffer {
2424
}
2525

2626
/// Get the primary key of the buffer.
27-
pub async fn get_primary_key(&self) -> Option<Key> {
27+
pub fn get_primary_key(&self) -> Option<Key> {
2828
self.primary_key.clone()
2929
}
3030

3131
/// Set the primary key if it is not set
32-
pub async fn primary_key_or(&mut self, key: &Key) {
32+
pub fn primary_key_or(&mut self, key: &Key) {
3333
self.primary_key.get_or_insert_with(|| key.clone());
3434
}
3535

3636
/// Get a value from the buffer.
3737
/// If the returned value is None, it means the key doesn't exist in buffer yet.
38-
pub async fn get(&self, key: &Key) -> Option<Value> {
39-
match self.get_from_mutations(key).await {
38+
pub fn get(&self, key: &Key) -> Option<Value> {
39+
match self.get_from_mutations(key) {
4040
MutationValue::Determined(value) => value,
4141
MutationValue::Undetermined => None,
4242
}
@@ -49,7 +49,7 @@ impl Buffer {
4949
F: FnOnce(Key) -> Fut,
5050
Fut: Future<Output = Result<Option<Value>>>,
5151
{
52-
match self.get_from_mutations(&key).await {
52+
match self.get_from_mutations(&key) {
5353
MutationValue::Determined(value) => Ok(value),
5454
MutationValue::Undetermined => {
5555
let value = f(key.clone()).await?;
@@ -161,7 +161,7 @@ impl Buffer {
161161
}
162162

163163
/// Lock the given key if necessary.
164-
pub async fn lock(&mut self, key: Key) {
164+
pub fn lock(&mut self, key: Key) {
165165
self.primary_key.get_or_insert_with(|| key.clone());
166166
let value = self
167167
.entry_map
@@ -175,7 +175,7 @@ impl Buffer {
175175
}
176176

177177
/// Put a value into the buffer (does not write through).
178-
pub async fn put(&mut self, key: Key, value: Value) {
178+
pub fn put(&mut self, key: Key, value: Value) {
179179
let mut entry = self.entry_map.entry(key.clone());
180180
match entry {
181181
Entry::Occupied(ref mut o)
@@ -189,7 +189,7 @@ impl Buffer {
189189
}
190190

191191
/// Mark a value as Insert mutation into the buffer (does not write through).
192-
pub async fn insert(&mut self, key: Key, value: Value) {
192+
pub fn insert(&mut self, key: Key, value: Value) {
193193
let mut entry = self.entry_map.entry(key.clone());
194194
match entry {
195195
Entry::Occupied(ref mut o) if matches!(o.get(), BufferEntry::Del) => {
@@ -200,7 +200,7 @@ impl Buffer {
200200
}
201201

202202
/// Mark a value as deleted.
203-
pub async fn delete(&mut self, key: Key) {
203+
pub fn delete(&mut self, key: Key) {
204204
let is_pessimistic = self.is_pessimistic;
205205
let mut entry = self.entry_map.entry(key.clone());
206206

@@ -217,14 +217,14 @@ impl Buffer {
217217
}
218218

219219
/// Converts the buffered mutations to the proto buffer version
220-
pub async fn to_proto_mutations(&self) -> Vec<kvrpcpb::Mutation> {
220+
pub fn to_proto_mutations(&self) -> Vec<kvrpcpb::Mutation> {
221221
self.entry_map
222222
.iter()
223223
.filter_map(|(key, mutation)| mutation.to_proto_with_key(key))
224224
.collect()
225225
}
226226

227-
async fn get_from_mutations(&self, key: &Key) -> MutationValue {
227+
fn get_from_mutations(&self, key: &Key) -> MutationValue {
228228
self.entry_map
229229
.get(&key)
230230
.map(BufferEntry::get_value)
@@ -365,15 +365,11 @@ mod tests {
365365
use futures::{executor::block_on, future::ready};
366366
use tikv_client_common::internal_err;
367367

368-
#[tokio::test]
369-
async fn set_and_get_from_buffer() {
368+
#[test]
369+
fn set_and_get_from_buffer() {
370370
let mut buffer = Buffer::new(false);
371-
buffer
372-
.put(b"key1".to_vec().into(), b"value1".to_vec())
373-
.await;
374-
buffer
375-
.put(b"key2".to_vec().into(), b"value2".to_vec())
376-
.await;
371+
buffer.put(b"key1".to_vec().into(), b"value1".to_vec());
372+
buffer.put(b"key2".to_vec().into(), b"value2".to_vec());
377373
assert_eq!(
378374
block_on(
379375
buffer.get_or_else(b"key1".to_vec().into(), move |_| ready(Err(internal_err!(
@@ -385,8 +381,8 @@ mod tests {
385381
b"value1".to_vec()
386382
);
387383

388-
buffer.delete(b"key2".to_vec().into()).await;
389-
buffer.put(b"key1".to_vec().into(), b"value".to_vec()).await;
384+
buffer.delete(b"key2".to_vec().into());
385+
buffer.put(b"key1".to_vec().into(), b"value".to_vec());
390386
assert_eq!(
391387
block_on(buffer.batch_get_or_else(
392388
vec![b"key2".to_vec().into(), b"key1".to_vec().into()].into_iter(),
@@ -401,15 +397,11 @@ mod tests {
401397
);
402398
}
403399

404-
#[tokio::test]
405-
async fn insert_and_get_from_buffer() {
400+
#[test]
401+
fn insert_and_get_from_buffer() {
406402
let mut buffer = Buffer::new(false);
407-
buffer
408-
.insert(b"key1".to_vec().into(), b"value1".to_vec())
409-
.await;
410-
buffer
411-
.insert(b"key2".to_vec().into(), b"value2".to_vec())
412-
.await;
403+
buffer.insert(b"key1".to_vec().into(), b"value1".to_vec());
404+
buffer.insert(b"key2".to_vec().into(), b"value2".to_vec());
413405
assert_eq!(
414406
block_on(
415407
buffer.get_or_else(b"key1".to_vec().into(), move |_| ready(Err(internal_err!(
@@ -421,10 +413,8 @@ mod tests {
421413
b"value1".to_vec()
422414
);
423415

424-
buffer.delete(b"key2".to_vec().into()).await;
425-
buffer
426-
.insert(b"key1".to_vec().into(), b"value".to_vec())
427-
.await;
416+
buffer.delete(b"key2".to_vec().into());
417+
buffer.insert(b"key1".to_vec().into(), b"value".to_vec());
428418
assert_eq!(
429419
block_on(buffer.batch_get_or_else(
430420
vec![b"key2".to_vec().into(), b"key1".to_vec().into()].into_iter(),
@@ -481,8 +471,8 @@ mod tests {
481471
}
482472

483473
// Check that multiple writes to the same key combine in the correct way.
484-
#[tokio::test]
485-
async fn state_machine() {
474+
#[test]
475+
fn state_machine() {
486476
let mut buffer = Buffer::new(false);
487477

488478
macro_rules! assert_entry {
@@ -493,28 +483,28 @@ mod tests {
493483

494484
// Insert + Delete = CheckNotExists
495485
let key: Key = b"key1".to_vec().into();
496-
buffer.insert(key.clone(), b"value1".to_vec()).await;
497-
buffer.delete(key.clone()).await;
486+
buffer.insert(key.clone(), b"value1".to_vec());
487+
buffer.delete(key.clone());
498488
assert_entry!(key, BufferEntry::CheckNotExist);
499489

500490
// CheckNotExists + Delete = CheckNotExists
501-
buffer.delete(key.clone()).await;
491+
buffer.delete(key.clone());
502492
assert_entry!(key, BufferEntry::CheckNotExist);
503493

504494
// CheckNotExists + Put = Insert
505-
buffer.put(key.clone(), b"value2".to_vec()).await;
495+
buffer.put(key.clone(), b"value2".to_vec());
506496
assert_entry!(key, BufferEntry::Insert(_));
507497

508498
// Insert + Put = Insert
509499
let key: Key = b"key2".to_vec().into();
510-
buffer.insert(key.clone(), b"value1".to_vec()).await;
511-
buffer.put(key.clone(), b"value2".to_vec()).await;
500+
buffer.insert(key.clone(), b"value1".to_vec());
501+
buffer.put(key.clone(), b"value2".to_vec());
512502
assert_entry!(key, BufferEntry::Insert(_));
513503

514504
// Delete + Insert = Put
515505
let key: Key = b"key3".to_vec().into();
516-
buffer.delete(key.clone()).await;
517-
buffer.insert(key.clone(), b"value1".to_vec()).await;
506+
buffer.delete(key.clone());
507+
buffer.insert(key.clone(), b"value1".to_vec());
518508
assert_entry!(key, BufferEntry::Put(_));
519509
}
520510
}

src/transaction/transaction.rs

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -398,7 +398,7 @@ impl<PdC: PdClient> Transaction<PdC> {
398398
self.pessimistic_lock(iter::once(key.clone()), false)
399399
.await?;
400400
}
401-
self.buffer.put(key, value.into()).await;
401+
self.buffer.put(key, value.into());
402402
Ok(())
403403
}
404404

@@ -424,7 +424,7 @@ impl<PdC: PdClient> Transaction<PdC> {
424424
pub async fn insert(&mut self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
425425
self.check_allow_operation().await?;
426426
let key = key.into();
427-
if self.buffer.get(&key).await.is_some() {
427+
if self.buffer.get(&key).is_some() {
428428
return Err(Error::DuplicateKeyInsertion);
429429
}
430430
if self.is_pessimistic() {
@@ -434,7 +434,7 @@ impl<PdC: PdClient> Transaction<PdC> {
434434
)
435435
.await?;
436436
}
437-
self.buffer.insert(key, value.into()).await;
437+
self.buffer.insert(key, value.into());
438438
Ok(())
439439
}
440440

@@ -462,7 +462,7 @@ impl<PdC: PdClient> Transaction<PdC> {
462462
self.pessimistic_lock(iter::once(key.clone()), false)
463463
.await?;
464464
}
465-
self.buffer.delete(key).await;
465+
self.buffer.delete(key);
466466
Ok(())
467467
}
468468

@@ -497,7 +497,7 @@ impl<PdC: PdClient> Transaction<PdC> {
497497
match self.options.kind {
498498
TransactionKind::Optimistic => {
499499
for key in keys {
500-
self.buffer.lock(key.into()).await;
500+
self.buffer.lock(key.into());
501501
}
502502
}
503503
TransactionKind::Pessimistic(_) => {
@@ -535,8 +535,8 @@ impl<PdC: PdClient> Transaction<PdC> {
535535
*status = TransactionStatus::StartedCommit;
536536
}
537537

538-
let primary_key = self.buffer.get_primary_key().await;
539-
let mutations = self.buffer.to_proto_mutations().await;
538+
let primary_key = self.buffer.get_primary_key();
539+
let mutations = self.buffer.to_proto_mutations();
540540
if mutations.is_empty() {
541541
assert!(primary_key.is_none());
542542
return Ok(None);
@@ -595,8 +595,8 @@ impl<PdC: PdClient> Transaction<PdC> {
595595
*status = TransactionStatus::StartedRollback;
596596
}
597597

598-
let primary_key = self.buffer.get_primary_key().await;
599-
let mutations = self.buffer.to_proto_mutations().await;
598+
let primary_key = self.buffer.get_primary_key();
599+
let mutations = self.buffer.to_proto_mutations();
600600
let res = Committer::new(
601601
primary_key,
602602
mutations,
@@ -619,7 +619,7 @@ impl<PdC: PdClient> Transaction<PdC> {
619619
/// Returns the TTL set on the transaction's locks by TiKV.
620620
pub async fn send_heart_beat(&mut self) -> Result<u64> {
621621
self.check_allow_operation().await?;
622-
let primary_key = match self.buffer.get_primary_key().await {
622+
let primary_key = match self.buffer.get_primary_key() {
623623
Some(k) => k,
624624
None => return Err(Error::NoPrimaryKey),
625625
};
@@ -695,7 +695,6 @@ impl<PdC: PdClient> Transaction<PdC> {
695695
let primary_lock = self
696696
.buffer
697697
.get_primary_key()
698-
.await
699698
.unwrap_or_else(|| first_key.clone());
700699
let for_update_ts = self.rpc.clone().get_timestamp().await?;
701700
self.options.push_for_update_ts(for_update_ts.clone());
@@ -717,12 +716,12 @@ impl<PdC: PdClient> Transaction<PdC> {
717716
let pairs = plan.execute().await;
718717

719718
// primary key will be set here if needed
720-
self.buffer.primary_key_or(&first_key).await;
719+
self.buffer.primary_key_or(&first_key);
721720

722721
self.start_auto_heartbeat().await;
723722

724723
for key in keys {
725-
self.buffer.lock(key.key()).await;
724+
self.buffer.lock(key.key());
726725
}
727726

728727
pairs
@@ -755,7 +754,6 @@ impl<PdC: PdClient> Transaction<PdC> {
755754
let primary_key = self
756755
.buffer
757756
.get_primary_key()
758-
.await
759757
.expect("Primary key should exist");
760758
let start_ts = self.timestamp.clone();
761759
let region_backoff = self.options.retry_options.region_backoff.clone();

0 commit comments

Comments
 (0)