Skip to content

Commit dd34500

Browse files
authored
transaction: Add batch_mutate interface (#418)
Signed-off-by: Ping Yu <yuping@pingcap.com>
1 parent d42b31a commit dd34500

File tree

4 files changed

+192
-2
lines changed

4 files changed

+192
-2
lines changed

src/transaction/buffer.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,15 @@ impl Buffer {
244244
}
245245
}
246246

247+
pub(crate) fn mutate(&mut self, m: kvrpcpb::Mutation) {
248+
let op = kvrpcpb::Op::from_i32(m.op).unwrap();
249+
match op {
250+
kvrpcpb::Op::Put => self.put(m.key.into(), m.value),
251+
kvrpcpb::Op::Del => self.delete(m.key.into()),
252+
_ => unimplemented!("only put and delete are supported in mutate"),
253+
};
254+
}
255+
247256
/// Converts the buffered mutations to the proto buffer version
248257
pub fn to_proto_mutations(&self) -> Vec<kvrpcpb::Mutation> {
249258
self.entry_map

src/transaction/transaction.rs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -522,6 +522,56 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Transaction<Cod, PdC> {
522522
Ok(())
523523
}
524524

525+
/// Batch mutate the database.
526+
///
527+
/// Only `Put` and `Delete` are supported.
528+
///
529+
/// # Examples
530+
///
531+
/// ```rust,no_run
532+
/// # use tikv_client::{Key, Config, TransactionClient, proto::kvrpcpb};
533+
/// # use futures::prelude::*;
534+
/// # futures::executor::block_on(async {
535+
/// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap();
536+
/// let mut txn = client.begin_optimistic().await.unwrap();
537+
/// let mutations = vec![
538+
/// kvrpcpb::Mutation {
539+
/// op: kvrpcpb::Op::Del.into(),
540+
/// key: b"k0".to_vec(),
541+
/// ..Default::default()
542+
/// },
543+
/// kvrpcpb::Mutation {
544+
/// op: kvrpcpb::Op::Put.into(),
545+
/// key: b"k1".to_vec(),
546+
/// value: b"v1".to_vec(),
547+
/// ..Default::default()
548+
/// },
549+
/// ];
550+
/// txn.batch_mutate(mutations).await.unwrap();
551+
/// txn.commit().await.unwrap();
552+
/// # });
553+
/// ```
554+
pub async fn batch_mutate(
555+
&mut self,
556+
mutations: impl IntoIterator<Item = kvrpcpb::Mutation>,
557+
) -> Result<()> {
558+
debug!("invoking transactional batch mutate request");
559+
self.check_allow_operation().await?;
560+
if self.is_pessimistic() {
561+
let mutations: Vec<kvrpcpb::Mutation> = mutations.into_iter().collect();
562+
self.pessimistic_lock(mutations.iter().map(|m| Key::from(m.key.clone())), false)
563+
.await?;
564+
for m in mutations {
565+
self.buffer.mutate(m);
566+
}
567+
} else {
568+
for m in mutations.into_iter() {
569+
self.buffer.mutate(m);
570+
}
571+
}
572+
Ok(())
573+
}
574+
525575
/// Lock the given keys without mutating their values.
526576
///
527577
/// In optimistic mode, write conflicts are not checked until commit.

tests/common/mod.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,12 @@ use std::time::Duration;
1010
use log::info;
1111
use log::warn;
1212
use rand::Rng;
13-
use tikv_client::ColumnFamily;
1413
use tikv_client::Key;
1514
use tikv_client::RawClient;
1615
use tikv_client::Result;
1716
use tikv_client::Transaction;
1817
use tikv_client::TransactionClient;
18+
use tikv_client::{ColumnFamily, Snapshot, TransactionOptions};
1919
use tokio::time::sleep;
2020

2121
const ENV_PD_ADDRS: &str = "PD_ADDRS";
@@ -147,6 +147,15 @@ pub fn gen_u32_keys(num: u32, rng: &mut impl Rng) -> HashSet<Vec<u8>> {
147147
set
148148
}
149149

150+
pub async fn snapshot(client: &TransactionClient, is_pessimistic: bool) -> Result<Snapshot> {
151+
let options = if is_pessimistic {
152+
TransactionOptions::new_pessimistic()
153+
} else {
154+
TransactionOptions::new_optimistic()
155+
};
156+
Ok(client.snapshot(client.current_timestamp().await?, options))
157+
}
158+
150159
/// Copied from https://github.com/tikv/tikv/blob/d86a449d7f5b656cef28576f166e73291f501d77/components/tikv_util/src/macros.rs#L55
151160
/// Simulates Go's defer.
152161
///

tests/integration_tests.rs

Lines changed: 123 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@ use rand::seq::IteratorRandom;
2121
use rand::thread_rng;
2222
use rand::Rng;
2323
use serial_test::serial;
24+
use tikv_client::backoff::DEFAULT_REGION_BACKOFF;
25+
use tikv_client::proto::kvrpcpb;
2426
use tikv_client::transaction::HeartbeatOption;
25-
use tikv_client::BoundRange;
2627
use tikv_client::Error;
2728
use tikv_client::Key;
2829
use tikv_client::KvPair;
@@ -31,6 +32,7 @@ use tikv_client::Result;
3132
use tikv_client::TransactionClient;
3233
use tikv_client::TransactionOptions;
3334
use tikv_client::Value;
35+
use tikv_client::{Backoff, BoundRange, RetryOptions, Transaction};
3436

3537
// Parameters used in test
3638
const NUM_PEOPLE: u32 = 100;
@@ -1078,3 +1080,123 @@ async fn txn_key_exists() -> Result<()> {
10781080
t3.commit().await?;
10791081
Ok(())
10801082
}
1083+
1084+
#[tokio::test]
1085+
#[serial]
1086+
async fn txn_batch_mutate_optimistic() -> Result<()> {
1087+
init().await?;
1088+
let client = TransactionClient::new(pd_addrs()).await?;
1089+
1090+
// Put k0
1091+
{
1092+
let mut txn = client.begin_optimistic().await?;
1093+
txn.put(b"k0".to_vec(), b"v0".to_vec()).await?;
1094+
txn.commit().await?;
1095+
}
1096+
// Delete k0 and put k1, k2
1097+
do_mutate(false).await.unwrap();
1098+
// Read and verify
1099+
verify_mutate(false).await;
1100+
Ok(())
1101+
}
1102+
1103+
#[tokio::test]
1104+
#[serial]
1105+
async fn txn_batch_mutate_pessimistic() -> Result<()> {
1106+
init().await?;
1107+
let client = TransactionClient::new(pd_addrs()).await?;
1108+
1109+
// Put k0
1110+
{
1111+
let mut txn = client.begin_pessimistic().await?;
1112+
txn.put(b"k0".to_vec(), b"v0".to_vec()).await?;
1113+
txn.commit().await?;
1114+
}
1115+
// txn1 lock k0, to verify pessimistic locking.
1116+
let mut txn1 = client.begin_pessimistic().await?;
1117+
txn1.put(b"k0".to_vec(), b"vv".to_vec()).await?;
1118+
1119+
// txn2 is blocked by txn1, then timeout.
1120+
let txn2_handle = tokio::spawn(do_mutate(true));
1121+
assert!(matches!(
1122+
txn2_handle.await?.unwrap_err(),
1123+
Error::PessimisticLockError { .. }
1124+
));
1125+
1126+
let txn3_handle = tokio::spawn(do_mutate(true));
1127+
// txn1 rollback to release lock.
1128+
txn1.rollback().await?;
1129+
txn3_handle.await?.unwrap();
1130+
1131+
// Read and verify
1132+
verify_mutate(true).await;
1133+
Ok(())
1134+
}
1135+
1136+
async fn begin_mutate(client: &TransactionClient, is_pessimistic: bool) -> Result<Transaction> {
1137+
if is_pessimistic {
1138+
let options = TransactionOptions::new_pessimistic().retry_options(RetryOptions {
1139+
region_backoff: DEFAULT_REGION_BACKOFF,
1140+
lock_backoff: Backoff::no_jitter_backoff(500, 500, 2),
1141+
});
1142+
client.begin_with_options(options).await
1143+
} else {
1144+
client.begin_optimistic().await
1145+
}
1146+
}
1147+
1148+
async fn do_mutate(is_pessimistic: bool) -> Result<()> {
1149+
let client = TransactionClient::new(pd_addrs()).await.unwrap();
1150+
let mut txn = begin_mutate(&client, is_pessimistic).await.unwrap();
1151+
1152+
let mutations = vec![
1153+
kvrpcpb::Mutation {
1154+
op: kvrpcpb::Op::Del.into(),
1155+
key: b"k0".to_vec(),
1156+
..Default::default()
1157+
},
1158+
kvrpcpb::Mutation {
1159+
op: kvrpcpb::Op::Put.into(),
1160+
key: b"k1".to_vec(),
1161+
value: b"v1".to_vec(),
1162+
..Default::default()
1163+
},
1164+
kvrpcpb::Mutation {
1165+
op: kvrpcpb::Op::Put.into(),
1166+
key: b"k2".to_vec(),
1167+
value: b"v2".to_vec(),
1168+
..Default::default()
1169+
},
1170+
];
1171+
1172+
match txn.batch_mutate(mutations).await {
1173+
Ok(()) => {
1174+
txn.commit().await?;
1175+
Ok(())
1176+
}
1177+
Err(err) => {
1178+
let _ = txn.rollback().await;
1179+
Err(err)
1180+
}
1181+
}
1182+
}
1183+
1184+
async fn verify_mutate(is_pessimistic: bool) {
1185+
let client = TransactionClient::new(pd_addrs()).await.unwrap();
1186+
let mut snapshot = snapshot(&client, is_pessimistic).await.unwrap();
1187+
let res: HashMap<Key, Value> = snapshot
1188+
.batch_get(vec!["k0".to_owned(), "k1".to_owned(), "k2".to_owned()])
1189+
.await
1190+
.unwrap()
1191+
.map(|pair| (pair.0, pair.1))
1192+
.collect();
1193+
assert_eq!(res.len(), 2);
1194+
assert_eq!(
1195+
res.get(&Key::from("k1".to_owned())),
1196+
Some(Value::from("v1".to_owned())).as_ref()
1197+
);
1198+
assert_eq!(
1199+
res.get(&Key::from("k2".to_owned())),
1200+
Some(Value::from("v2".to_owned())).as_ref()
1201+
);
1202+
}

0 commit comments

Comments
 (0)