Skip to content

Commit 74db41c

Browse files
authored
txn: Error handling for pessimistic locks (#332)
* wip Signed-off-by: pingyu <yuping@pingcap.com> * wip Signed-off-by: pingyu <yuping@pingcap.com> * wip Signed-off-by: pingyu <yuping@pingcap.com> * close #313: add tests Signed-off-by: pingyu <yuping@pingcap.com> * trigger actions Signed-off-by: pingyu <yuping@pingcap.com> * Issue Number #313: fix CI by set timeout longer. Signed-off-by: pingyu <yuping@pingcap.com> * Issue Number #313: Add comment. Signed-off-by: pingyu <yuping@pingcap.com>
1 parent 196b06e commit 74db41c

File tree

10 files changed

+362
-48
lines changed

10 files changed

+362
-48
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,4 +107,4 @@ jobs:
107107
- name: start tiup playground
108108
run: /home/runner/.tiup/bin/tiup playground nightly --mode tikv-slim --kv 3 --without-monitor --kv.config /home/runner/work/client-rust/client-rust/config/tikv.toml --pd.config /home/runner/work/client-rust/client-rust/config/pd.toml &
109109
- name: integration test
110-
run: make integration-test
110+
run: MULTI_REGION=1 make integration-test

src/raw/client.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use crate::{
1313
pd::{PdClient, PdRpcClient},
1414
raw::lowering::*,
1515
request::{Collect, CollectSingle, Plan},
16-
BoundRange, ColumnFamily, Key, KvPair, Result, Value,
16+
Backoff, BoundRange, ColumnFamily, Key, KvPair, Result, Value,
1717
};
1818

1919
const MAX_RAW_KV_SCAN_LIMIT: u32 = 10240;
@@ -359,11 +359,19 @@ impl<PdC: PdClient> Client<PdC> {
359359
/// # });
360360
/// ```
361361
pub async fn delete_range(&self, range: impl Into<BoundRange>) -> Result<()> {
362+
self.delete_range_opt(range, DEFAULT_REGION_BACKOFF).await
363+
}
364+
365+
pub async fn delete_range_opt(
366+
&self,
367+
range: impl Into<BoundRange>,
368+
backoff: Backoff,
369+
) -> Result<()> {
362370
debug!(self.logger, "invoking raw delete_range request");
363371
self.assert_non_atomic()?;
364372
let request = new_raw_delete_range_request(range.into(), self.cf.clone());
365373
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
366-
.retry_multi_region(DEFAULT_REGION_BACKOFF)
374+
.retry_multi_region(backoff)
367375
.extract_error()
368376
.plan();
369377
plan.execute().await?;

src/request/plan.rs

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use crate::{
1616
stats::tikv_stats,
1717
store::RegionStore,
1818
transaction::{resolve_locks, HasLocks},
19+
util::iter::FlatMapOkIterExt,
1920
Error, Result,
2021
};
2122

@@ -63,6 +64,11 @@ pub struct RetryableMultiRegion<P: Plan, PdC: PdClient> {
6364
pub(super) inner: P,
6465
pub pd_client: Arc<PdC>,
6566
pub backoff: Backoff,
67+
68+
/// Preserve all regions' results for other downstream plans to handle.
69+
/// If true, return Ok and preserve all regions' results, even if some of them are Err.
70+
/// Otherwise, return the first Err if there is any.
71+
pub preserve_region_results: bool,
6672
}
6773

6874
impl<P: Plan + Shardable, PdC: PdClient> RetryableMultiRegion<P, PdC>
@@ -76,6 +82,7 @@ where
7682
current_plan: P,
7783
backoff: Backoff,
7884
permits: Arc<Semaphore>,
85+
preserve_region_results: bool,
7986
) -> Result<<Self as Plan>::Result> {
8087
let shards = current_plan.shards(&pd_client).collect::<Vec<_>>().await;
8188
let mut handles = Vec::new();
@@ -89,16 +96,29 @@ where
8996
region_store,
9097
backoff.clone(),
9198
permits.clone(),
99+
preserve_region_results,
92100
));
93101
handles.push(handle);
94102
}
95-
Ok(try_join_all(handles)
96-
.await?
97-
.into_iter()
98-
.collect::<Result<Vec<_>>>()?
99-
.into_iter()
100-
.flatten()
101-
.collect())
103+
104+
let results = try_join_all(handles).await?;
105+
if preserve_region_results {
106+
Ok(results
107+
.into_iter()
108+
.flat_map_ok(|x| x)
109+
.map(|x| match x {
110+
Ok(r) => r,
111+
Err(e) => Err(e),
112+
})
113+
.collect())
114+
} else {
115+
Ok(results
116+
.into_iter()
117+
.collect::<Result<Vec<_>>>()?
118+
.into_iter()
119+
.flatten()
120+
.collect())
121+
}
102122
}
103123

104124
#[async_recursion]
@@ -108,6 +128,7 @@ where
108128
region_store: RegionStore,
109129
mut backoff: Backoff,
110130
permits: Arc<Semaphore>,
131+
preserve_region_results: bool,
111132
) -> Result<<Self as Plan>::Result> {
112133
// limit concurrent requests
113134
let permit = permits.acquire().await.unwrap();
@@ -125,7 +146,14 @@ where
125146
if !region_error_resolved {
126147
futures_timer::Delay::new(duration).await;
127148
}
128-
Self::single_plan_handler(pd_client, plan, backoff, permits).await
149+
Self::single_plan_handler(
150+
pd_client,
151+
plan,
152+
backoff,
153+
permits,
154+
preserve_region_results,
155+
)
156+
.await
129157
}
130158
None => Err(Error::RegionError(e)),
131159
}
@@ -242,6 +270,7 @@ impl<P: Plan, PdC: PdClient> Clone for RetryableMultiRegion<P, PdC> {
242270
inner: self.inner.clone(),
243271
pd_client: self.pd_client.clone(),
244272
backoff: self.backoff.clone(),
273+
preserve_region_results: self.preserve_region_results,
245274
}
246275
}
247276
}
@@ -263,6 +292,7 @@ where
263292
self.inner.clone(),
264293
self.backoff.clone(),
265294
concurrency_permits.clone(),
295+
self.preserve_region_results,
266296
)
267297
.await
268298
}
@@ -556,6 +586,7 @@ mod test {
556586
},
557587
pd_client: Arc::new(MockPdClient::default()),
558588
backoff: Backoff::no_backoff(),
589+
preserve_region_results: false,
559590
};
560591
assert!(plan.execute().await.is_err())
561592
}

src/request/plan_builder.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,13 +113,31 @@ where
113113
pub fn retry_multi_region(
114114
self,
115115
backoff: Backoff,
116+
) -> PlanBuilder<PdC, RetryableMultiRegion<P, PdC>, Targetted> {
117+
self.make_retry_multi_region(backoff, false)
118+
}
119+
120+
/// Preserve all results, even some of them are Err.
121+
/// To pass all responses to merge, and handle partial successful results correctly.
122+
pub fn retry_multi_region_preserve_results(
123+
self,
124+
backoff: Backoff,
125+
) -> PlanBuilder<PdC, RetryableMultiRegion<P, PdC>, Targetted> {
126+
self.make_retry_multi_region(backoff, true)
127+
}
128+
129+
fn make_retry_multi_region(
130+
self,
131+
backoff: Backoff,
132+
preserve_region_results: bool,
116133
) -> PlanBuilder<PdC, RetryableMultiRegion<P, PdC>, Targetted> {
117134
PlanBuilder {
118135
pd_client: self.pd_client.clone(),
119136
plan: RetryableMultiRegion {
120137
inner: self.plan,
121138
pd_client: self.pd_client,
122139
backoff,
140+
preserve_region_results,
123141
},
124142
phantom: PhantomData,
125143
}

src/transaction/buffer.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,19 @@ impl Buffer {
174174
}
175175
}
176176

177+
/// Unlock the given key if locked.
178+
pub fn unlock(&mut self, key: &Key) {
179+
if let Some(value) = self.entry_map.get_mut(key) {
180+
if let BufferEntry::Locked(v) = value {
181+
if let Some(v) = v {
182+
*value = BufferEntry::Cached(v.take());
183+
} else {
184+
self.entry_map.remove(key);
185+
}
186+
}
187+
}
188+
}
189+
177190
/// Put a value into the buffer (does not write through).
178191
pub fn put(&mut self, key: Key, value: Value) {
179192
let mut entry = self.entry_map.entry(key.clone());
@@ -485,6 +498,12 @@ mod tests {
485498
};
486499
}
487500

501+
macro_rules! assert_entry_none {
502+
($key: ident) => {
503+
assert!(matches!(buffer.entry_map.get(&$key), None,))
504+
};
505+
}
506+
488507
// Insert + Delete = CheckNotExists
489508
let key: Key = b"key1".to_vec().into();
490509
buffer.insert(key.clone(), b"value1".to_vec());
@@ -510,5 +529,27 @@ mod tests {
510529
buffer.delete(key.clone());
511530
buffer.insert(key.clone(), b"value1".to_vec());
512531
assert_entry!(key, BufferEntry::Put(_));
532+
533+
// Lock + Unlock = None
534+
let key: Key = b"key4".to_vec().into();
535+
buffer.lock(key.clone());
536+
buffer.unlock(&key);
537+
assert_entry_none!(key);
538+
539+
// Cached + Lock + Unlock = Cached
540+
let key: Key = b"key5".to_vec().into();
541+
let val: Value = b"value5".to_vec();
542+
let val_ = val.clone();
543+
let r = block_on(buffer.get_or_else(key.clone(), move |_| ready(Ok(Some(val_)))));
544+
assert_eq!(r.unwrap().unwrap(), val);
545+
buffer.lock(key.clone());
546+
buffer.unlock(&key);
547+
assert_entry!(key, BufferEntry::Cached(Some(_)));
548+
assert_eq!(
549+
block_on(buffer.get_or_else(key, move |_| ready(Err(internal_err!("")))))
550+
.unwrap()
551+
.unwrap(),
552+
val
553+
);
513554
}
514555
}

0 commit comments

Comments
 (0)