Skip to content

Commit 8534111

Browse files
committed
fix
1 parent 99e366e commit 8534111

File tree

11 files changed

+88
-71
lines changed

11 files changed

+88
-71
lines changed

src/common/errors.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,10 @@ pub enum Error {
113113
inner: Box<Error>,
114114
success_keys: Vec<Vec<u8>>,
115115
},
116+
#[error("Keyspace is not enabled on the server, please enable it by setting `storage.api-version = 2`")]
117+
ServerKeyspaceNotEnabled,
118+
#[error("Keyspace is not enabled on the client, please enable it by `Config::with_default_keyspace()` or `Config::with_keyspace()`")]
119+
ClientKeyspaceNotEnabled,
116120
}
117121

118122
impl From<crate::proto::errorpb::Error> for Error {

src/config.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,15 +88,15 @@ impl Config {
8888

8989
/// Set to use default keyspace.
9090
///
91-
/// Server should enable `api-version = 2` to use this feature.
91+
/// Server should enable `storage.api-version = 2` to use this feature.
9292
#[must_use]
9393
pub fn with_default_keyspace(self) -> Self {
9494
self.with_keyspace("DEFAULT")
9595
}
9696

9797
/// Set the use keyspace for the client.
9898
///
99-
/// Server should enable `api-version = 2` to use this feature.
99+
/// Server should enable `storage.api-version = 2` to use this feature.
100100
#[must_use]
101101
pub fn with_keyspace(mut self, keyspace: &str) -> Self {
102102
self.keyspace = Some(keyspace.to_owned());

src/raw/client.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,7 @@ impl<PdC: PdClient> Client<PdC> {
233233
let request = new_raw_get_request(key, self.cf.clone());
234234
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request)
235235
.retry_multi_region(self.backoff.clone())
236+
.extract_error()
236237
.merge(CollectSingle)
237238
.post_process_default()
238239
.plan();
@@ -268,6 +269,7 @@ impl<PdC: PdClient> Client<PdC> {
268269
let request = new_raw_batch_get_request(keys, self.cf.clone());
269270
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request)
270271
.retry_multi_region(self.backoff.clone())
272+
.extract_error()
271273
.merge(Collect)
272274
.plan();
273275
plan.execute().await.map(|r| {
@@ -581,6 +583,7 @@ impl<PdC: PdClient> Client<PdC> {
581583
);
582584
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, req)
583585
.retry_multi_region(self.backoff.clone())
586+
.extract_error()
584587
.merge(CollectSingle)
585588
.post_process_default()
586589
.plan();
@@ -660,6 +663,7 @@ impl<PdC: PdClient> Client<PdC> {
660663
crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request)
661664
.single_region_with_store(region_store.clone())
662665
.await?
666+
.extract_error()
663667
.plan()
664668
.execute()
665669
.await?;
@@ -719,6 +723,7 @@ impl<PdC: PdClient> Client<PdC> {
719723
let request = new_raw_batch_scan_request(ranges, each_limit, key_only, self.cf.clone());
720724
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request)
721725
.retry_multi_region(self.backoff.clone())
726+
.extract_error()
722727
.merge(Collect)
723728
.plan();
724729
plan.execute().await.map(|r| {

src/request/api_version.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ fn keyspace_prefix(keyspace_id: u32, key_mode: KeyMode) -> [u8; KEYSPACE_PREFIX_
163163
fn prepend_bytes<const N: usize>(vec: &mut Vec<u8>, prefix: &[u8; N]) {
164164
unsafe {
165165
vec.reserve_exact(N);
166-
std::ptr::copy(vec.as_ptr(), vec.as_mut_ptr().offset(N as isize), vec.len());
166+
std::ptr::copy(vec.as_ptr(), vec.as_mut_ptr().add(N), vec.len());
167167
std::ptr::copy_nonoverlapping(prefix.as_ptr(), vec.as_mut_ptr(), N);
168168
vec.set_len(vec.len() + N);
169169
}
@@ -172,11 +172,7 @@ fn prepend_bytes<const N: usize>(vec: &mut Vec<u8>, prefix: &[u8; N]) {
172172
fn pretruncate_bytes<const N: usize>(vec: &mut Vec<u8>) {
173173
assert!(vec.len() >= N);
174174
unsafe {
175-
std::ptr::copy(
176-
vec.as_ptr().offset(N as isize),
177-
vec.as_mut_ptr(),
178-
vec.len() - N,
179-
);
175+
std::ptr::copy(vec.as_ptr().add(N), vec.as_mut_ptr(), vec.len() - N);
180176
vec.set_len(vec.len() - N);
181177
}
182178
}

src/request/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ mod test {
112112

113113
#[tokio::test]
114114
async fn test_region_retry() {
115-
#[derive(Clone)]
115+
#[derive(Debug, Clone)]
116116
struct MockRpcResponse;
117117

118118
impl HasKeyErrors for MockRpcResponse {

src/request/plan.rs

Lines changed: 49 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -599,7 +599,7 @@ where
599599
}
600600
}
601601

602-
#[derive(Default)]
602+
#[derive(Debug, Default)]
603603
pub struct CleanupLocksResult {
604604
pub region_error: Option<errorpb::Error>,
605605
pub key_error: Option<Vec<Error>>,
@@ -784,18 +784,54 @@ where
784784
type Result = P::Result;
785785

786786
async fn execute(&self) -> Result<Self::Result> {
787-
let mut result = self.inner.execute().await?;
788-
if let Some(errors) = result.key_errors() {
789-
Err(Error::ExtractedErrors(errors))
790-
} else if let Some(errors) = result.region_errors() {
791-
Err(Error::ExtractedErrors(
792-
errors
793-
.into_iter()
794-
.map(|e| Error::RegionError(Box::new(e)))
795-
.collect(),
796-
))
797-
} else {
798-
Ok(result)
787+
let result = self.inner.execute().await;
788+
let mut errors = Vec::new();
789+
match result {
790+
Ok(mut resp) => {
791+
if let Some(e) = resp.key_errors() {
792+
errors.extend(e);
793+
}
794+
if let Some(e) = resp.region_errors() {
795+
errors.extend(e.into_iter().map(|e| Error::RegionError(Box::new(e))));
796+
}
797+
798+
if errors.is_empty() {
799+
return Ok(resp);
800+
}
801+
}
802+
Err(Error::MultipleKeyErrors(e)) => errors.extend(e),
803+
Err(e) => errors.push(e),
804+
};
805+
for error in &mut errors {
806+
match error {
807+
Error::KvError { message }
808+
if message
809+
.contains("Api version in request does not match with TiKV storage")
810+
&& message.contains("storage: V1, request: V2") =>
811+
{
812+
*error = Error::ServerKeyspaceNotEnabled
813+
}
814+
Error::KvError { message }
815+
if message
816+
.contains("Api version in request does not match with TiKV storage")
817+
&& message.contains("storage: V1ttl, request: V2") =>
818+
{
819+
*error = Error::ServerKeyspaceNotEnabled
820+
}
821+
Error::KvError { message }
822+
if message
823+
.contains("Api version in request does not match with TiKV storage")
824+
&& message.contains("storage: V2, request: V1") =>
825+
{
826+
*error = Error::ClientKeyspaceNotEnabled
827+
}
828+
_ => (),
829+
}
830+
}
831+
match errors.len() {
832+
0 => unreachable!(),
833+
1 => Err(errors.pop().unwrap()),
834+
_ => Err(Error::ExtractedErrors(errors)),
799835
}
800836
}
801837
}

src/store/errors.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
22

3-
use std::fmt::Display;
4-
53
use crate::proto::kvrpcpb;
64
use crate::Error;
75

@@ -162,10 +160,11 @@ impl HasKeyErrors for kvrpcpb::PessimisticRollbackResponse {
162160
}
163161
}
164162

165-
impl<T: HasKeyErrors, E: Display> HasKeyErrors for Result<T, E> {
163+
impl<T: HasKeyErrors> HasKeyErrors for Result<T, Error> {
166164
fn key_errors(&mut self) -> Option<Vec<Error>> {
167165
match self {
168166
Ok(x) => x.key_errors(),
167+
Err(Error::MultipleKeyErrors(e)) => Some(std::mem::take(e)),
169168
Err(e) => Some(vec![Error::StringError(e.to_string())]),
170169
}
171170
}

src/transaction/client.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,7 @@ impl Client {
299299
let req = new_unsafe_destroy_range_request(range);
300300
let plan = crate::request::PlanBuilder::new(self.pd.clone(), self.api_version, req)
301301
.all_stores(DEFAULT_STORE_BACKOFF)
302+
.extract_error()
302303
.merge(crate::request::Collect)
303304
.plan();
304305
plan.execute().await

src/transaction/transaction.rs

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ impl<PdC: PdClient> Transaction<PdC> {
146146
let plan = PlanBuilder::new(rpc, api_version, request)
147147
.resolve_lock(retry_options.lock_backoff, api_version)
148148
.retry_multi_region(DEFAULT_REGION_BACKOFF)
149+
.extract_error()
149150
.merge(CollectSingle)
150151
.post_process_default()
151152
.plan();
@@ -281,6 +282,7 @@ impl<PdC: PdClient> Transaction<PdC> {
281282
let plan = PlanBuilder::new(rpc, api_version, request)
282283
.resolve_lock(retry_options.lock_backoff, api_version)
283284
.retry_multi_region(retry_options.region_backoff)
285+
.extract_error()
284286
.merge(Collect)
285287
.plan();
286288
plan.execute()
@@ -333,7 +335,11 @@ impl<PdC: PdClient> Transaction<PdC> {
333335
let keys = keys
334336
.into_iter()
335337
.map(move |k| k.into().encode_version(api_version, KeyMode::Txn));
336-
self.pessimistic_lock(keys, true).await
338+
let pairs = self
339+
.pessimistic_lock(keys, true)
340+
.await?
341+
.truncate_version(api_version);
342+
Ok(pairs)
337343
}
338344
}
339345

@@ -541,23 +547,14 @@ impl<PdC: PdClient> Transaction<PdC> {
541547
/// # Examples
542548
///
543549
/// ```rust,no_run
544-
/// # use tikv_client::{Key, Config, TransactionClient, proto::kvrpcpb};
550+
/// # use tikv_client::{Key, Config, TransactionClient, transaction::Mutation};
545551
/// # use futures::prelude::*;
546552
/// # futures::executor::block_on(async {
547553
/// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap();
548554
/// let mut txn = client.begin_optimistic().await.unwrap();
549555
/// let mutations = vec![
550-
/// kvrpcpb::Mutation {
551-
/// op: kvrpcpb::Op::Del.into(),
552-
/// key: b"k0".to_vec(),
553-
/// ..Default::default()
554-
/// },
555-
/// kvrpcpb::Mutation {
556-
/// op: kvrpcpb::Op::Put.into(),
557-
/// key: b"k1".to_vec(),
558-
/// value: b"v1".to_vec(),
559-
/// ..Default::default()
560-
/// },
556+
/// Mutation::Delete("k0".to_owned().into()),
557+
/// Mutation::Put("k1".to_owned().into(), b"v1".to_vec()),
561558
/// ];
562559
/// txn.batch_mutate(mutations).await.unwrap();
563560
/// txn.commit().await.unwrap();
@@ -574,7 +571,7 @@ impl<PdC: PdClient> Transaction<PdC> {
574571
.map(|mutation| mutation.encode_version(self.api_version, KeyMode::Txn))
575572
.collect();
576573
if self.is_pessimistic() {
577-
self.pessimistic_lock(mutations.iter().map(|m| Key::from(m.key().clone())), false)
574+
self.pessimistic_lock(mutations.iter().map(|m| m.key().clone()), false)
578575
.await?;
579576
for m in mutations {
580577
self.buffer.mutate(m);
@@ -623,12 +620,11 @@ impl<PdC: PdClient> Transaction<PdC> {
623620
match self.options.kind {
624621
TransactionKind::Optimistic => {
625622
for key in keys {
626-
self.buffer.lock(key.into());
623+
self.buffer.lock(key);
627624
}
628625
}
629626
TransactionKind::Pessimistic(_) => {
630-
self.pessimistic_lock(keys, false)
631-
.await?;
627+
self.pessimistic_lock(keys, false).await?;
632628
}
633629
}
634630
Ok(())
@@ -771,6 +767,7 @@ impl<PdC: PdClient> Transaction<PdC> {
771767
self.api_version,
772768
)
773769
.retry_multi_region(self.options.retry_options.region_backoff.clone())
770+
.extract_error()
774771
.merge(CollectSingle)
775772
.post_process_default()
776773
.plan();
@@ -803,6 +800,7 @@ impl<PdC: PdClient> Transaction<PdC> {
803800
let plan = PlanBuilder::new(rpc, api_version, request)
804801
.resolve_lock(retry_options.lock_backoff, api_version)
805802
.retry_multi_region(retry_options.region_backoff)
803+
.extract_error()
806804
.merge(Collect)
807805
.plan();
808806
plan.execute()
@@ -863,6 +861,7 @@ impl<PdC: PdClient> Transaction<PdC> {
863861
)
864862
.preserve_shard()
865863
.retry_multi_region_preserve_results(self.options.retry_options.region_backoff.clone())
864+
.extract_error()
866865
.merge(CollectWithShard)
867866
.plan();
868867
let pairs = plan.execute().await;
@@ -920,6 +919,7 @@ impl<PdC: PdClient> Transaction<PdC> {
920919
)
921920
.retry_multi_region(self.options.retry_options.region_backoff.clone())
922921
.extract_error()
922+
.extract_error()
923923
.plan();
924924
plan.execute().await?;
925925

@@ -989,6 +989,7 @@ impl<PdC: PdClient> Transaction<PdC> {
989989
let plan = PlanBuilder::new(rpc.clone(), api_version, request)
990990
.retry_multi_region(region_backoff.clone())
991991
.merge(CollectSingle)
992+
.extract_error()
992993
.plan();
993994
plan.execute().await?;
994995
}
@@ -1042,7 +1043,6 @@ impl<PdC: PdClient> Drop for Transaction<PdC> {
10421043
if self.get_status() == TransactionStatus::Active {
10431044
match self.options.check_level {
10441045
CheckLevel::Panic => {
1045-
dbg!(&self.timestamp);
10461046
panic!("Dropping an active transaction. Consider commit or rollback it.")
10471047
}
10481048
CheckLevel::Warn => {

tests/failpoint_tests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,7 @@ async fn must_rollbacked(client: &TransactionClient, keys: HashSet<Vec<u8>>) {
374374

375375
async fn count_locks(client: &TransactionClient) -> Result<usize> {
376376
let ts = client.current_timestamp().await.unwrap();
377-
let locks = client.scan_locks(&ts, vec![].., 1024).await?;
377+
let locks = client.scan_locks(&ts, .., 1024).await?;
378378
// De-duplicated as `scan_locks` will return duplicated locks due to retry on region changes.
379379
let locks_set: HashSet<Vec<u8>> = HashSet::from_iter(locks.into_iter().map(|l| l.key));
380380
Ok(locks_set.len())

0 commit comments

Comments
 (0)