Skip to content

Commit ceea0f9

Browse files
Calculate cleverer TTL for transaction (#250)
* Fix clippy Signed-off-by: Zijie Lu <wslzj40@gmail.com> * Address some review comments Signed-off-by: Zijie Lu <wslzj40@gmail.com> Fix clippy Signed-off-by: Zijie Lu <wslzj40@gmail.com> * Add key Signed-off-by: Zijie Lu <wslzj40@gmail.com> * Address review comments Signed-off-by: Zijie Lu <wslzj40@gmail.com> * use cfg test Signed-off-by: Zijie Lu <wslzj40@gmail.com> * fix heartbeat compile Signed-off-by: Andy Lok <andylokandy@hotmail.com> * fix heartbeat tests Signed-off-by: Andy Lok <andylokandy@hotmail.com> * Fix cargo Signed-off-by: Zijie Lu <wslzj40@gmail.com> * Fix test Signed-off-by: Zijie Lu <wslzj40@gmail.com> * Address review comments Signed-off-by: Zijie Lu <wslzj40@gmail.com> * nitpick Signed-off-by: Zijie Lu <wslzj40@gmail.com> * Fix test Signed-off-by: Zijie Lu <wslzj40@gmail.com> * cargo fmt Signed-off-by: Zijie Lu <wslzj40@gmail.com> * Fix test Signed-off-by: Zijie Lu <wslzj40@gmail.com> * Use local timestamp instead fetching ts from pd Signed-off-by: Zijie Lu <wslzj40@gmail.com> * Address review comment Signed-off-by: Zijie Lu <wslzj40@gmail.com> * improve unit test Signed-off-by: Andy Lok <andylokandy@hotmail.com> * fix test Signed-off-by: Andy Lok <andylokandy@hotmail.com> Co-authored-by: Andy Lok <andylokandy@hotmail.com>
1 parent 9b47f68 commit ceea0f9

File tree

7 files changed

+117
-51
lines changed

7 files changed

+117
-51
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ edition = "2018"
1111
[features]
1212
default = []
1313
# Enable integration tests with a running TiKV and PD instance.
14-
# Use $PD_ADDRS, comma separated, to set the addresses the tests use.
14+
# Use $PD_ADDRS, comma separated, to set the addresses the tests use.
1515
integration-tests = []
1616

1717
[lib]

src/kv/key.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,10 @@ impl Key {
126126
encoded.encode_bytes(&self.0, false).unwrap();
127127
Key(encoded)
128128
}
129+
130+
pub fn len(&self) -> usize {
131+
self.0.len()
132+
}
129133
}
130134

131135
impl From<Vec<u8>> for Key {

src/transaction/buffer.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,17 @@ impl Buffer {
224224
.collect()
225225
}
226226

227+
pub fn get_write_size(&self) -> usize {
228+
self.entry_map
229+
.iter()
230+
.map(|(k, v)| match v {
231+
BufferEntry::Put(val) | BufferEntry::Insert(val) => val.len() + k.len(),
232+
BufferEntry::Del => k.len(),
233+
_ => 0,
234+
})
235+
.sum()
236+
}
237+
227238
fn get_from_mutations(&self, key: &Key) -> MutationValue {
228239
self.entry_map
229240
.get(&key)

src/transaction/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
pub use client::Client;
1212
pub(crate) use lock::{resolve_locks, HasLocks};
1313
pub use snapshot::Snapshot;
14+
#[doc(hidden)]
15+
pub use transaction::HeartbeatOption;
1416
pub use transaction::{CheckLevel, Transaction, TransactionOptions};
1517

1618
mod buffer;

src/transaction/transaction.rs

Lines changed: 82 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use crate::{
1111
use derive_new::new;
1212
use fail::fail_point;
1313
use futures::{prelude::*, stream::BoxStream};
14-
use std::{iter, ops::RangeBounds, sync::Arc};
14+
use std::{iter, ops::RangeBounds, sync::Arc, time::Instant};
1515
use tikv_client_proto::{kvrpcpb, pdpb::Timestamp};
1616
use tokio::{sync::RwLock, time::Duration};
1717

@@ -60,6 +60,7 @@ pub struct Transaction<PdC: PdClient = PdRpcClient> {
6060
rpc: Arc<PdC>,
6161
options: TransactionOptions,
6262
is_heartbeat_started: bool,
63+
start_instant: Instant,
6364
}
6465

6566
impl<PdC: PdClient> Transaction<PdC> {
@@ -80,6 +81,7 @@ impl<PdC: PdClient> Transaction<PdC> {
8081
rpc,
8182
options,
8283
is_heartbeat_started: false,
84+
start_instant: std::time::Instant::now(),
8385
}
8486
}
8587

@@ -550,6 +552,8 @@ impl<PdC: PdClient> Transaction<PdC> {
550552
self.timestamp.clone(),
551553
self.rpc.clone(),
552554
self.options.clone(),
555+
self.buffer.get_write_size() as u64,
556+
self.start_instant,
553557
)
554558
.commit()
555559
.await;
@@ -603,6 +607,8 @@ impl<PdC: PdClient> Transaction<PdC> {
603607
self.timestamp.clone(),
604608
self.rpc.clone(),
605609
self.options.clone(),
610+
self.buffer.get_write_size() as u64,
611+
self.start_instant,
606612
)
607613
.rollback()
608614
.await;
@@ -617,6 +623,7 @@ impl<PdC: PdClient> Transaction<PdC> {
617623
/// Send a heart beat message to keep the transaction alive on the server and update its TTL.
618624
///
619625
/// Returns the TTL set on the transaction's locks by TiKV.
626+
#[doc(hidden)]
620627
pub async fn send_heart_beat(&mut self) -> Result<u64> {
621628
self.check_allow_operation().await?;
622629
let primary_key = match self.buffer.get_primary_key() {
@@ -745,7 +752,7 @@ impl<PdC: PdClient> Transaction<PdC> {
745752
}
746753

747754
async fn start_auto_heartbeat(&mut self) {
748-
if !self.options.auto_heartbeat || self.is_heartbeat_started {
755+
if !self.options.heartbeat_option.is_auto_heartbeat() || self.is_heartbeat_started {
749756
return;
750757
}
751758
self.is_heartbeat_started = true;
@@ -758,10 +765,14 @@ impl<PdC: PdClient> Transaction<PdC> {
758765
let start_ts = self.timestamp.clone();
759766
let region_backoff = self.options.retry_options.region_backoff.clone();
760767
let rpc = self.rpc.clone();
768+
let heartbeat_interval = match self.options.heartbeat_option {
769+
HeartbeatOption::NoHeartbeat => DEFAULT_HEARTBEAT_INTERVAL,
770+
HeartbeatOption::FixedTime(heartbeat_interval) => heartbeat_interval,
771+
};
761772

762773
let heartbeat_task = async move {
763774
loop {
764-
tokio::time::sleep(DEFAULT_HEARTBEAT_INTERVAL).await;
775+
tokio::time::sleep(heartbeat_interval).await;
765776
{
766777
let status = status.read().await;
767778
if matches!(
@@ -819,6 +830,17 @@ impl<PdC: PdClient> Drop for Transaction<PdC> {
819830
}
820831
}
821832

833+
/// The default max TTL of a lock in milliseconds. Also called `ManagedLockTTL` in TiDB.
834+
const MAX_TTL: u64 = 20000;
835+
/// The default TTL of a lock in milliseconds.
836+
const DEFAULT_LOCK_TTL: u64 = 3000;
837+
/// The default heartbeat interval
838+
const DEFAULT_HEARTBEAT_INTERVAL: Duration = Duration::from_millis(MAX_TTL / 2);
839+
/// TiKV recommends each RPC packet should be less than around 1MB. We keep KV size of
840+
/// each request below 16KB.
841+
const TXN_COMMIT_BATCH_SIZE: u64 = 16 * 1024;
842+
const TTL_FACTOR: f64 = 6000.0;
843+
822844
/// Optimistic or pessimistic transaction.
823845
#[derive(Clone, PartialEq, Debug)]
824846
pub enum TransactionKind {
@@ -844,8 +866,14 @@ pub struct TransactionOptions {
844866
retry_options: RetryOptions,
845867
/// What to do if the transaction is dropped without an attempt to commit or rollback
846868
check_level: CheckLevel,
847-
/// Whether heartbeat will be sent automatically
848-
auto_heartbeat: bool,
869+
#[doc(hidden)]
870+
heartbeat_option: HeartbeatOption,
871+
}
872+
873+
#[derive(Clone, PartialEq, Debug)]
874+
pub enum HeartbeatOption {
875+
NoHeartbeat,
876+
FixedTime(Duration),
849877
}
850878

851879
impl Default for TransactionOptions {
@@ -864,7 +892,7 @@ impl TransactionOptions {
864892
read_only: false,
865893
retry_options: RetryOptions::default_optimistic(),
866894
check_level: CheckLevel::Panic,
867-
auto_heartbeat: true,
895+
heartbeat_option: HeartbeatOption::FixedTime(DEFAULT_HEARTBEAT_INTERVAL),
868896
}
869897
}
870898

@@ -877,7 +905,7 @@ impl TransactionOptions {
877905
read_only: false,
878906
retry_options: RetryOptions::default_pessimistic(),
879907
check_level: CheckLevel::Panic,
880-
auto_heartbeat: true,
908+
heartbeat_option: HeartbeatOption::FixedTime(DEFAULT_HEARTBEAT_INTERVAL),
881909
}
882910
}
883911

@@ -935,10 +963,8 @@ impl TransactionOptions {
935963
}
936964
}
937965

938-
/// The transaction should not automatically send heartbeat messages to TiKV to keep itself
939-
// alive.
940-
pub fn no_auto_hearbeat(mut self) -> TransactionOptions {
941-
self.auto_heartbeat = false;
966+
pub fn heartbeat_option(mut self, heartbeat_option: HeartbeatOption) -> TransactionOptions {
967+
self.heartbeat_option = heartbeat_option;
942968
self
943969
}
944970

@@ -967,10 +993,11 @@ pub enum CheckLevel {
967993
None,
968994
}
969995

970-
/// The default TTL of a lock in milliseconds.
971-
const DEFAULT_LOCK_TTL: u64 = 3000;
972-
/// The default heartbeat interval.
973-
const DEFAULT_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(1);
996+
impl HeartbeatOption {
997+
pub fn is_auto_heartbeat(&self) -> bool {
998+
!matches!(self, HeartbeatOption::NoHeartbeat)
999+
}
1000+
}
9741001

9751002
/// A struct wrapping the details of two-phase commit protocol (2PC).
9761003
///
@@ -988,6 +1015,8 @@ struct Committer<PdC: PdClient = PdRpcClient> {
9881015
options: TransactionOptions,
9891016
#[new(default)]
9901017
undetermined: bool,
1018+
write_size: u64,
1019+
start_instant: Instant,
9911020
}
9921021

9931022
impl<PdC: PdClient> Committer<PdC> {
@@ -1026,20 +1055,20 @@ impl<PdC: PdClient> Committer<PdC> {
10261055

10271056
async fn prewrite(&mut self) -> Result<Option<Timestamp>> {
10281057
let primary_lock = self.primary_key.clone().unwrap();
1029-
// FIXME: calculate TTL for big transactions
1030-
let lock_ttl = DEFAULT_LOCK_TTL;
1058+
let elapsed = self.start_instant.elapsed().as_millis() as u64;
1059+
let lock_ttl = self.calc_txn_lock_ttl();
10311060
let mut request = match &self.options.kind {
10321061
TransactionKind::Optimistic => new_prewrite_request(
10331062
self.mutations.clone(),
10341063
primary_lock,
10351064
self.start_version.clone(),
1036-
lock_ttl,
1065+
lock_ttl + elapsed,
10371066
),
10381067
TransactionKind::Pessimistic(for_update_ts) => new_pessimistic_prewrite_request(
10391068
self.mutations.clone(),
10401069
primary_lock,
10411070
self.start_version.clone(),
1042-
lock_ttl,
1071+
lock_ttl + elapsed,
10431072
for_update_ts.clone(),
10441073
),
10451074
};
@@ -1173,6 +1202,16 @@ impl<PdC: PdClient> Committer<PdC> {
11731202
}
11741203
Ok(())
11751204
}
1205+
1206+
fn calc_txn_lock_ttl(&mut self) -> u64 {
1207+
let mut lock_ttl = DEFAULT_LOCK_TTL;
1208+
if self.write_size > TXN_COMMIT_BATCH_SIZE {
1209+
let size_mb = self.write_size as f64 / 1024.0 / 1024.0;
1210+
lock_ttl = (TTL_FACTOR * size_mb.sqrt()) as u64;
1211+
lock_ttl = lock_ttl.min(MAX_TTL).max(DEFAULT_LOCK_TTL);
1212+
}
1213+
lock_ttl
1214+
}
11761215
}
11771216

11781217
#[derive(PartialEq)]
@@ -1197,6 +1236,7 @@ enum TransactionStatus {
11971236
mod tests {
11981237
use crate::{
11991238
mock::{MockKvClient, MockPdClient},
1239+
transaction::HeartbeatOption,
12001240
Transaction, TransactionOptions,
12011241
};
12021242
use fail::FailScenario;
@@ -1207,40 +1247,42 @@ mod tests {
12071247
atomic::{AtomicUsize, Ordering},
12081248
Arc,
12091249
},
1250+
time::Duration,
12101251
};
12111252
use tikv_client_proto::{kvrpcpb, pdpb::Timestamp};
12121253

12131254
#[tokio::test]
12141255
async fn test_optimistic_heartbeat() -> Result<(), io::Error> {
12151256
let scenario = FailScenario::setup();
1216-
fail::cfg("after-prewrite", "sleep(10000)").unwrap();
1257+
fail::cfg("after-prewrite", "sleep(1500)").unwrap();
12171258
let heartbeats = Arc::new(AtomicUsize::new(0));
12181259
let heartbeats_cloned = heartbeats.clone();
12191260
let pd_client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook(
12201261
move |req: &dyn Any| {
1221-
if let Some(_heartbeat) = req.downcast_ref::<kvrpcpb::TxnHeartBeatRequest>() {
1262+
if let Some(_) = req.downcast_ref::<kvrpcpb::TxnHeartBeatRequest>() {
12221263
heartbeats_cloned.fetch_add(1, Ordering::SeqCst);
1223-
return Ok(Box::new(kvrpcpb::TxnHeartBeatResponse::default()) as Box<dyn Any>);
1224-
} else if let Some(_prewrite) = req.downcast_ref::<kvrpcpb::PrewriteRequest>() {
1225-
return Ok(Box::new(kvrpcpb::PrewriteResponse::default()) as Box<dyn Any>);
1264+
Ok(Box::new(kvrpcpb::TxnHeartBeatResponse::default()) as Box<dyn Any>)
1265+
} else if let Some(_) = req.downcast_ref::<kvrpcpb::PrewriteRequest>() {
1266+
Ok(Box::new(kvrpcpb::PrewriteResponse::default()) as Box<dyn Any>)
1267+
} else {
1268+
Ok(Box::new(kvrpcpb::CommitResponse::default()) as Box<dyn Any>)
12261269
}
1227-
Ok(Box::new(kvrpcpb::CommitResponse::default()) as Box<dyn Any>)
12281270
},
12291271
)));
12301272
let key1 = "key1".to_owned();
12311273
let mut heartbeat_txn = Transaction::new(
12321274
Timestamp::default(),
12331275
pd_client,
1234-
TransactionOptions::new_optimistic(),
1276+
TransactionOptions::new_optimistic()
1277+
.heartbeat_option(HeartbeatOption::FixedTime(Duration::from_secs(1))),
12351278
);
12361279
heartbeat_txn.put(key1.clone(), "foo").await.unwrap();
12371280
let heartbeat_txn_handle = tokio::task::spawn_blocking(move || {
12381281
assert!(futures::executor::block_on(heartbeat_txn.commit()).is_ok())
12391282
});
12401283
assert_eq!(heartbeats.load(Ordering::SeqCst), 0);
1241-
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
12421284
heartbeat_txn_handle.await.unwrap();
1243-
assert!(heartbeats.load(Ordering::SeqCst) >= 1);
1285+
assert_eq!(heartbeats.load(Ordering::SeqCst), 1);
12441286
scenario.teardown();
12451287
Ok(())
12461288
}
@@ -1251,35 +1293,33 @@ mod tests {
12511293
let heartbeats_cloned = heartbeats.clone();
12521294
let pd_client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook(
12531295
move |req: &dyn Any| {
1254-
if let Some(_heartbeat) = req.downcast_ref::<kvrpcpb::TxnHeartBeatRequest>() {
1296+
if let Some(_) = req.downcast_ref::<kvrpcpb::TxnHeartBeatRequest>() {
12551297
heartbeats_cloned.fetch_add(1, Ordering::SeqCst);
1256-
return Ok(Box::new(kvrpcpb::TxnHeartBeatResponse::default()) as Box<dyn Any>);
1257-
} else if let Some(_prewrite) = req.downcast_ref::<kvrpcpb::PrewriteRequest>() {
1258-
return Ok(Box::new(kvrpcpb::PrewriteResponse::default()) as Box<dyn Any>);
1259-
} else if let Some(_pessimistic_lock) =
1260-
req.downcast_ref::<kvrpcpb::PessimisticLockRequest>()
1261-
{
1262-
return Ok(
1263-
Box::new(kvrpcpb::PessimisticLockResponse::default()) as Box<dyn Any>
1264-
);
1298+
Ok(Box::new(kvrpcpb::TxnHeartBeatResponse::default()) as Box<dyn Any>)
1299+
} else if let Some(_) = req.downcast_ref::<kvrpcpb::PrewriteRequest>() {
1300+
Ok(Box::new(kvrpcpb::PrewriteResponse::default()) as Box<dyn Any>)
1301+
} else if let Some(_) = req.downcast_ref::<kvrpcpb::PessimisticLockRequest>() {
1302+
Ok(Box::new(kvrpcpb::PessimisticLockResponse::default()) as Box<dyn Any>)
1303+
} else {
1304+
Ok(Box::new(kvrpcpb::CommitResponse::default()) as Box<dyn Any>)
12651305
}
1266-
Ok(Box::new(kvrpcpb::CommitResponse::default()) as Box<dyn Any>)
12671306
},
12681307
)));
12691308
let key1 = "key1".to_owned();
12701309
let mut heartbeat_txn = Transaction::new(
12711310
Timestamp::default(),
12721311
pd_client,
1273-
TransactionOptions::new_pessimistic(),
1312+
TransactionOptions::new_pessimistic()
1313+
.heartbeat_option(HeartbeatOption::FixedTime(Duration::from_secs(1))),
12741314
);
12751315
heartbeat_txn.put(key1.clone(), "foo").await.unwrap();
12761316
assert_eq!(heartbeats.load(Ordering::SeqCst), 0);
1277-
tokio::time::sleep(tokio::time::Duration::from_millis(5000)).await;
1317+
tokio::time::sleep(tokio::time::Duration::from_millis(1500)).await;
1318+
assert_eq!(heartbeats.load(Ordering::SeqCst), 1);
12781319
let heartbeat_txn_handle = tokio::spawn(async move {
12791320
assert!(heartbeat_txn.commit().await.is_ok());
12801321
});
12811322
heartbeat_txn_handle.await.unwrap();
1282-
assert!(heartbeats.load(Ordering::SeqCst) > 1);
12831323
Ok(())
12841324
}
12851325
}

0 commit comments

Comments
 (0)