Skip to content

Commit 20318ab

Browse files
authored
Merge pull request #262 from ekexium/atomic
Support atomic operations (CAS, put, and delete)
2 parents f3ce6d7 + c24865b commit 20318ab

File tree

22 files changed

+394
-27
lines changed

22 files changed

+394
-27
lines changed

mock-tikv/src/pd.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -336,11 +336,11 @@ impl Pd for MockPd {
336336
todo!()
337337
}
338338

339-
fn get_dc_locations(
339+
fn get_dc_location_info(
340340
&mut self,
341-
_: ::grpcio::RpcContext<'_>,
342-
_: GetDcLocationsRequest,
343-
_: ::grpcio::UnarySink<GetDcLocationsResponse>,
341+
_: grpcio::RpcContext,
342+
_: GetDcLocationInfoRequest,
343+
_: grpcio::UnarySink<GetDcLocationInfoResponse>,
344344
) {
345345
todo!()
346346
}

mock-tikv/src/server.rs

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use derive_new::new;
55
use futures::{FutureExt, TryFutureExt};
66
use grpcio::{Environment, Server, ServerBuilder};
77
use std::sync::Arc;
8-
use tikv_client_proto::{kvrpcpb::*, tikvpb::*};
8+
use tikv_client_proto::{coprocessor_v2::*, kvrpcpb::*, tikvpb::*};
99

1010
pub const MOCK_TIKV_PORT: u16 = 50019;
1111

@@ -514,11 +514,38 @@ impl Tikv for MockTikv {
514514
todo!()
515515
}
516516

517+
fn raw_get_key_ttl(
518+
&mut self,
519+
_: grpcio::RpcContext,
520+
_: RawGetKeyTtlRequest,
521+
_: grpcio::UnarySink<RawGetKeyTtlResponse>,
522+
) {
523+
todo!()
524+
}
525+
526+
fn raw_compare_and_swap(
527+
&mut self,
528+
_: grpcio::RpcContext,
529+
_: RawCasRequest,
530+
_: grpcio::UnarySink<RawCasResponse>,
531+
) {
532+
todo!()
533+
}
534+
517535
fn coprocessor_v2(
518536
&mut self,
519537
_: grpcio::RpcContext,
520-
_: tikv_client_proto::coprocessor_v2::RawCoprocessorRequest,
521-
_: grpcio::UnarySink<tikv_client_proto::coprocessor_v2::RawCoprocessorResponse>,
538+
_: RawCoprocessorRequest,
539+
_: grpcio::UnarySink<RawCoprocessorResponse>,
540+
) {
541+
todo!()
542+
}
543+
544+
fn get_store_safe_ts(
545+
&mut self,
546+
_: grpcio::RpcContext,
547+
_: StoreSafeTsRequest,
548+
_: grpcio::UnarySink<StoreSafeTsResponse>,
522549
) {
523550
todo!()
524551
}

src/raw/client.rs

Lines changed: 74 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ const MAX_RAW_KV_SCAN_LIMIT: u32 = 10240;
2424
pub struct Client {
2525
rpc: Arc<PdRpcClient>,
2626
cf: Option<ColumnFamily>,
27+
/// Whether to use the [`atomic mode`](Client::with_atomic_for_cas).
28+
atomic: bool,
2729
}
2830

2931
impl Client {
@@ -63,7 +65,11 @@ impl Client {
6365
) -> Result<Client> {
6466
let pd_endpoints: Vec<String> = pd_endpoints.into_iter().map(Into::into).collect();
6567
let rpc = Arc::new(PdRpcClient::connect(&pd_endpoints, &config, false).await?);
66-
Ok(Client { rpc, cf: None })
68+
Ok(Client {
69+
rpc,
70+
cf: None,
71+
atomic: false,
72+
})
6773
}
6874

6975
/// Set the column family of requests.
@@ -89,6 +95,22 @@ impl Client {
8995
Client {
9096
rpc: self.rpc.clone(),
9197
cf: Some(cf),
98+
atomic: self.atomic,
99+
}
100+
}
101+
102+
/// Set to use the atomic mode.
103+
///
104+
/// The only reason of using atomic mode is the
105+
/// [`compare_and_swap`](Client::compare_and_swap) operation. To guarantee
106+
/// the atomicity of CAS, write operations like [`put`](Client::put) or
107+
/// [`delete`](Client::delete) in atomic mode are more expensive. Some
108+
/// operations are not supported in the mode.
109+
pub fn with_atomic_for_cas(&self) -> Client {
110+
Client {
111+
rpc: self.rpc.clone(),
112+
cf: self.cf.clone(),
113+
atomic: true,
92114
}
93115
}
94116

@@ -171,7 +193,7 @@ impl Client {
171193
/// # });
172194
/// ```
173195
pub async fn put(&self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
174-
let request = new_raw_put_request(key.into(), value.into(), self.cf.clone());
196+
let request = new_raw_put_request(key.into(), value.into(), self.cf.clone(), self.atomic);
175197
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
176198
.single_region()
177199
.await?
@@ -203,7 +225,11 @@ impl Client {
203225
&self,
204226
pairs: impl IntoIterator<Item = impl Into<KvPair>>,
205227
) -> Result<()> {
206-
let request = new_raw_batch_put_request(pairs.into_iter().map(Into::into), self.cf.clone());
228+
let request = new_raw_batch_put_request(
229+
pairs.into_iter().map(Into::into),
230+
self.cf.clone(),
231+
self.atomic,
232+
);
207233
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
208234
.multi_region()
209235
.retry_region(DEFAULT_REGION_BACKOFF)
@@ -231,7 +257,7 @@ impl Client {
231257
/// # });
232258
/// ```
233259
pub async fn delete(&self, key: impl Into<Key>) -> Result<()> {
234-
let request = new_raw_delete_request(key.into(), self.cf.clone());
260+
let request = new_raw_delete_request(key.into(), self.cf.clone(), self.atomic);
235261
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
236262
.single_region()
237263
.await?
@@ -260,6 +286,7 @@ impl Client {
260286
/// # });
261287
/// ```
262288
pub async fn batch_delete(&self, keys: impl IntoIterator<Item = impl Into<Key>>) -> Result<()> {
289+
self.assert_non_atomic()?;
263290
let request =
264291
new_raw_batch_delete_request(keys.into_iter().map(Into::into), self.cf.clone());
265292
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
@@ -287,6 +314,7 @@ impl Client {
287314
/// # });
288315
/// ```
289316
pub async fn delete_range(&self, range: impl Into<BoundRange>) -> Result<()> {
317+
self.assert_non_atomic()?;
290318
let request = new_raw_delete_range_request(range.into(), self.cf.clone());
291319
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
292320
.multi_region()
@@ -415,6 +443,40 @@ impl Client {
415443
.collect())
416444
}
417445

446+
/// Create a new *atomic* 'compare and set' request.
447+
///
448+
/// Once resolved this request will result in an atomic `compare and set'
449+
/// operation for the given key.
450+
///
451+
/// If the value retrived is equal to `current_value`, `new_value` is
452+
/// written.
453+
///
454+
/// # Return Value
455+
///
456+
/// A tuple is returned if successful: the previous value and whether the
457+
/// value is swapped
458+
pub async fn compare_and_swap(
459+
&self,
460+
key: impl Into<Key>,
461+
previous_value: impl Into<Option<Value>>,
462+
new_value: impl Into<Value>,
463+
) -> Result<(Option<Value>, bool)> {
464+
self.assert_atomic()?;
465+
let req = new_cas_request(
466+
key.into(),
467+
new_value.into(),
468+
previous_value.into(),
469+
self.cf.clone(),
470+
);
471+
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), req)
472+
.single_region()
473+
.await?
474+
.retry_region(DEFAULT_REGION_BACKOFF)
475+
.post_process_default()
476+
.plan();
477+
plan.execute().await
478+
}
479+
418480
async fn scan_inner(
419481
&self,
420482
range: impl Into<BoundRange>,
@@ -467,4 +529,12 @@ impl Client {
467529
.plan();
468530
plan.execute().await
469531
}
532+
533+
fn assert_non_atomic(&self) -> Result<()> {
534+
(!self.atomic).then(|| ()).ok_or(Error::UnsupportedMode)
535+
}
536+
537+
fn assert_atomic(&self) -> Result<()> {
538+
self.atomic.then(|| ()).ok_or(Error::UnsupportedMode)
539+
}
470540
}

src/raw/lowering.rs

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,25 @@ pub fn new_raw_put_request(
2222
key: Key,
2323
value: Value,
2424
cf: Option<ColumnFamily>,
25+
atomic: bool,
2526
) -> kvrpcpb::RawPutRequest {
26-
requests::new_raw_put_request(key.into(), value, cf)
27+
requests::new_raw_put_request(key.into(), value, cf, atomic)
2728
}
2829

2930
pub fn new_raw_batch_put_request(
3031
pairs: impl Iterator<Item = KvPair>,
3132
cf: Option<ColumnFamily>,
33+
atomic: bool,
3234
) -> kvrpcpb::RawBatchPutRequest {
33-
requests::new_raw_batch_put_request(pairs.map(Into::into).collect(), cf)
35+
requests::new_raw_batch_put_request(pairs.map(Into::into).collect(), cf, atomic)
3436
}
3537

36-
pub fn new_raw_delete_request(key: Key, cf: Option<ColumnFamily>) -> kvrpcpb::RawDeleteRequest {
37-
requests::new_raw_delete_request(key.into(), cf)
38+
pub fn new_raw_delete_request(
39+
key: Key,
40+
cf: Option<ColumnFamily>,
41+
atomic: bool,
42+
) -> kvrpcpb::RawDeleteRequest {
43+
requests::new_raw_delete_request(key.into(), cf, atomic)
3844
}
3945

4046
pub fn new_raw_batch_delete_request(
@@ -76,3 +82,12 @@ pub fn new_raw_batch_scan_request(
7682
) -> kvrpcpb::RawBatchScanRequest {
7783
requests::new_raw_batch_scan_request(ranges.map(Into::into).collect(), each_limit, key_only, cf)
7884
}
85+
86+
pub fn new_cas_request(
87+
key: Key,
88+
value: Value,
89+
previous_value: Option<Value>,
90+
cf: Option<ColumnFamily>,
91+
) -> kvrpcpb::RawCasRequest {
92+
requests::new_cas_request(key.into(), value, previous_value, cf)
93+
}

src/raw/requests.rs

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,11 +76,13 @@ pub fn new_raw_put_request(
7676
key: Vec<u8>,
7777
value: Vec<u8>,
7878
cf: Option<ColumnFamily>,
79+
atomic: bool,
7980
) -> kvrpcpb::RawPutRequest {
8081
let mut req = kvrpcpb::RawPutRequest::default();
8182
req.set_key(key);
8283
req.set_value(value);
8384
req.maybe_set_cf(cf);
85+
req.set_for_cas(atomic);
8486

8587
req
8688
}
@@ -98,10 +100,12 @@ impl SingleKey for kvrpcpb::RawPutRequest {
98100
pub fn new_raw_batch_put_request(
99101
pairs: Vec<kvrpcpb::KvPair>,
100102
cf: Option<ColumnFamily>,
103+
atomic: bool,
101104
) -> kvrpcpb::RawBatchPutRequest {
102105
let mut req = kvrpcpb::RawBatchPutRequest::default();
103106
req.set_pairs(pairs);
104107
req.maybe_set_cf(cf);
108+
req.set_for_cas(atomic);
105109

106110
req
107111
}
@@ -132,10 +136,15 @@ impl Shardable for kvrpcpb::RawBatchPutRequest {
132136
}
133137
}
134138

135-
pub fn new_raw_delete_request(key: Vec<u8>, cf: Option<ColumnFamily>) -> kvrpcpb::RawDeleteRequest {
139+
pub fn new_raw_delete_request(
140+
key: Vec<u8>,
141+
cf: Option<ColumnFamily>,
142+
atomic: bool,
143+
) -> kvrpcpb::RawDeleteRequest {
136144
let mut req = kvrpcpb::RawDeleteRequest::default();
137145
req.set_key(key);
138146
req.maybe_set_cf(cf);
147+
req.set_for_cas(atomic);
139148

140149
req
141150
}
@@ -267,6 +276,46 @@ impl Merge<kvrpcpb::RawBatchScanResponse> for Collect {
267276
}
268277
}
269278

279+
pub fn new_cas_request(
280+
key: Vec<u8>,
281+
value: Vec<u8>,
282+
previous_value: Option<Vec<u8>>,
283+
cf: Option<ColumnFamily>,
284+
) -> kvrpcpb::RawCasRequest {
285+
let mut req = kvrpcpb::RawCasRequest::default();
286+
req.set_key(key);
287+
req.set_value(value);
288+
match previous_value {
289+
Some(v) => req.set_previous_value(v),
290+
None => req.set_previous_not_exist(true),
291+
}
292+
req.maybe_set_cf(cf);
293+
req
294+
}
295+
296+
impl KvRequest for kvrpcpb::RawCasRequest {
297+
type Response = kvrpcpb::RawCasResponse;
298+
}
299+
300+
impl SingleKey for kvrpcpb::RawCasRequest {
301+
fn key(&self) -> &Vec<u8> {
302+
&self.key
303+
}
304+
}
305+
306+
impl Process<kvrpcpb::RawCasResponse> for DefaultProcessor {
307+
type Out = (Option<Value>, bool); // (previous_value, swapped)
308+
309+
fn process(&self, input: Result<kvrpcpb::RawCasResponse>) -> Result<Self::Out> {
310+
let input = input?;
311+
if input.previous_not_exist {
312+
Ok((None, input.succeed))
313+
} else {
314+
Ok((Some(input.previous_value), input.succeed))
315+
}
316+
}
317+
}
318+
270319
macro_rules! impl_raw_rpc_request {
271320
($name: ident) => {
272321
impl RawRpcRequest for kvrpcpb::$name {
@@ -286,6 +335,7 @@ impl_raw_rpc_request!(RawBatchDeleteRequest);
286335
impl_raw_rpc_request!(RawScanRequest);
287336
impl_raw_rpc_request!(RawBatchScanRequest);
288337
impl_raw_rpc_request!(RawDeleteRangeRequest);
338+
impl_raw_rpc_request!(RawCasRequest);
289339

290340
impl HasLocks for kvrpcpb::RawGetResponse {}
291341
impl HasLocks for kvrpcpb::RawBatchGetResponse {}
@@ -296,6 +346,7 @@ impl HasLocks for kvrpcpb::RawBatchDeleteResponse {}
296346
impl HasLocks for kvrpcpb::RawScanResponse {}
297347
impl HasLocks for kvrpcpb::RawBatchScanResponse {}
298348
impl HasLocks for kvrpcpb::RawDeleteRangeResponse {}
349+
impl HasLocks for kvrpcpb::RawCasResponse {}
299350

300351
#[cfg(test)]
301352
mod test {

src/request/plan.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -276,8 +276,11 @@ impl<P: Plan + HasKeys, PdC: PdClient> HasKeys for ResolveLock<P, PdC> {
276276
}
277277
}
278278

279-
/// When executed, the plan extracts errors from its inner plan, and
280-
/// returns an `Err` wrapping the error.
279+
/// When executed, the plan extracts errors from its inner plan, and returns an
280+
/// `Err` wrapping the error.
281+
///
282+
/// We usually need to apply this plan if (and only if) the output of the inner
283+
/// plan is of a response type.
281284
///
282285
/// The errors come from two places: `Err` from inner plans, and `Ok(response)`
283286
/// where `response` contains unresolved errors (`error` and `region_error`).

src/timestamp.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ impl TimestampExt for Timestamp {
3535
Self {
3636
physical: version >> PHYSICAL_SHIFT_BITS,
3737
logical: version & LOGICAL_MASK,
38+
// Now we only support global transactions
39+
suffix_bits: 0,
3840
}
3941
}
4042

0 commit comments

Comments
 (0)