Skip to content

Commit 8c94e39

Browse files
andylokandyekexium
andauthored
Add raw_coprocessor (#293)
* parallel requesting Signed-off-by: Andy Lok <andylokandy@hotmail.com> * improve Signed-off-by: Andy Lok <andylokandy@hotmail.com> * fix clippy Signed-off-by: Andy Lok <andylokandy@hotmail.com> * better Signed-off-by: Andy Lok <andylokandy@hotmail.com> * Add raw_coprocessor Signed-off-by: Andy Lok <andylokandy@hotmail.com> * Improve coprocessor experience with key range Signed-off-by: Andy Lok <andylokandy@hotmail.com> * Remove unused item Signed-off-by: Andy Lok <andylokandy@hotmail.com> * Run rustfmt Signed-off-by: Andy Lok <andylokandy@hotmail.com> * Fix bug Signed-off-by: Andy Lok <andylokandy@hotmail.com> * Check semver Signed-off-by: Andy Lok <andylokandy@hotmail.com> * Update kvproto Signed-off-by: Andy Lok <andylokandy@hotmail.com> * Fix kvproto Signed-off-by: Andy Lok <andylokandy@hotmail.com> * Address comment Signed-off-by: Andy Lok <andylokandy@hotmail.com> * improve preserve_shard Signed-off-by: Andy Lok <andylokandy@hotmail.com> * Revert get_shard Signed-off-by: Andy Lok <andylokandy@hotmail.com> * add unit test for coprocessor Signed-off-by: andylokandy <andylokandy@hotmail.com> * improve Signed-off-by: andylokandy <andylokandy@hotmail.com> * Apply suggestions from code review Signed-off-by: Andy Lok <andylokandy@hotmail.com> Co-authored-by: Ziqian Qin <ekexium@gmail.com> Co-authored-by: Ziqian Qin <ekexium@gmail.com>
1 parent c14f23a commit 8c94e39

31 files changed

+564
-440
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ name = "tikv_client"
2020
[dependencies]
2121
async-trait = "0.1"
2222
derive-new = "0.5"
23+
either = "1.6"
2324
fail = "0.4"
2425
futures = { version = "0.3", features = ["async-await", "thread-pool"] }
2526
futures-timer = "3.0"
@@ -29,6 +30,7 @@ log = "0.4"
2930
prometheus = { version = "0.12", features = [ "push", "process" ], default-features = false }
3031
rand = "0.8"
3132
regex = "1"
33+
semver = "0.11"
3234
serde = "1.0"
3335
serde_derive = "1.0"
3436
slog = { version = "2.3", features = ["max_level_trace", "release_max_level_debug"] }

mock-tikv/src/server.rs

Lines changed: 11 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use derive_new::new;
55
use futures::{FutureExt, TryFutureExt};
66
use grpcio::{Environment, Server, ServerBuilder};
77
use std::sync::Arc;
8-
use tikv_client_proto::{coprocessor_v2::*, kvrpcpb::*, tikvpb::*};
8+
use tikv_client_proto::{kvrpcpb::*, tikvpb::*};
99

1010
pub const MOCK_TIKV_PORT: u16 = 50019;
1111

@@ -271,60 +271,6 @@ impl Tikv for MockTikv {
271271
todo!()
272272
}
273273

274-
fn ver_get(
275-
&mut self,
276-
_ctx: grpcio::RpcContext,
277-
_req: tikv_client_proto::kvrpcpb::VerGetRequest,
278-
_sink: grpcio::UnarySink<tikv_client_proto::kvrpcpb::VerGetResponse>,
279-
) {
280-
todo!()
281-
}
282-
283-
fn ver_batch_get(
284-
&mut self,
285-
_ctx: grpcio::RpcContext,
286-
_req: tikv_client_proto::kvrpcpb::VerBatchGetRequest,
287-
_sink: grpcio::UnarySink<tikv_client_proto::kvrpcpb::VerBatchGetResponse>,
288-
) {
289-
todo!()
290-
}
291-
292-
fn ver_mut(
293-
&mut self,
294-
_ctx: grpcio::RpcContext,
295-
_req: tikv_client_proto::kvrpcpb::VerMutRequest,
296-
_sink: grpcio::UnarySink<tikv_client_proto::kvrpcpb::VerMutResponse>,
297-
) {
298-
todo!()
299-
}
300-
301-
fn ver_batch_mut(
302-
&mut self,
303-
_ctx: grpcio::RpcContext,
304-
_req: tikv_client_proto::kvrpcpb::VerBatchMutRequest,
305-
_sink: grpcio::UnarySink<tikv_client_proto::kvrpcpb::VerBatchMutResponse>,
306-
) {
307-
todo!()
308-
}
309-
310-
fn ver_scan(
311-
&mut self,
312-
_ctx: grpcio::RpcContext,
313-
_req: tikv_client_proto::kvrpcpb::VerScanRequest,
314-
_sink: grpcio::UnarySink<tikv_client_proto::kvrpcpb::VerScanResponse>,
315-
) {
316-
todo!()
317-
}
318-
319-
fn ver_delete_range(
320-
&mut self,
321-
_ctx: grpcio::RpcContext,
322-
_req: tikv_client_proto::kvrpcpb::VerDeleteRangeRequest,
323-
_sink: grpcio::UnarySink<tikv_client_proto::kvrpcpb::VerDeleteRangeResponse>,
324-
) {
325-
todo!()
326-
}
327-
328274
fn unsafe_destroy_range(
329275
&mut self,
330276
_ctx: grpcio::RpcContext,
@@ -532,7 +478,7 @@ impl Tikv for MockTikv {
532478
todo!()
533479
}
534480

535-
fn coprocessor_v2(
481+
fn raw_coprocessor(
536482
&mut self,
537483
_: grpcio::RpcContext,
538484
_: RawCoprocessorRequest,
@@ -549,4 +495,13 @@ impl Tikv for MockTikv {
549495
) {
550496
todo!()
551497
}
498+
499+
fn get_lock_wait_info(
500+
&mut self,
501+
_: grpcio::RpcContext,
502+
_: GetLockWaitInfoRequest,
503+
_: grpcio::UnarySink<GetLockWaitInfoResponse>,
504+
) {
505+
todo!()
506+
}
552507
}

src/mock.rs

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ impl MockPdClient {
107107
pub fn region1() -> RegionWithLeader {
108108
let mut region = RegionWithLeader::default();
109109
region.region.id = 1;
110-
region.region.set_start_key(vec![0]);
110+
region.region.set_start_key(vec![]);
111111
region.region.set_end_key(vec![10]);
112112

113113
let leader = metapb::Peer {
@@ -133,6 +133,21 @@ impl MockPdClient {
133133

134134
region
135135
}
136+
137+
pub fn region3() -> RegionWithLeader {
138+
let mut region = RegionWithLeader::default();
139+
region.region.id = 3;
140+
region.region.set_start_key(vec![250, 250]);
141+
region.region.set_end_key(vec![]);
142+
143+
let leader = metapb::Peer {
144+
store_id: 43,
145+
..Default::default()
146+
};
147+
region.leader = Some(leader);
148+
149+
region
150+
}
136151
}
137152

138153
#[async_trait]
@@ -145,10 +160,12 @@ impl PdClient for MockPdClient {
145160

146161
async fn region_for_key(&self, key: &Key) -> Result<RegionWithLeader> {
147162
let bytes: &[_] = key.into();
148-
let region = if bytes.is_empty() || bytes[0] < 10 {
163+
let region = if bytes.is_empty() || bytes < &[10][..] {
149164
Self::region1()
150-
} else {
165+
} else if bytes >= &[10][..] && bytes < &[250, 250][..] {
151166
Self::region2()
167+
} else {
168+
Self::region3()
152169
};
153170

154171
Ok(region)
@@ -158,6 +175,7 @@ impl PdClient for MockPdClient {
158175
match id {
159176
1 => Ok(Self::region1()),
160177
2 => Ok(Self::region2()),
178+
3 => Ok(Self::region3()),
161179
_ => Err(Error::RegionNotFoundInResponse { region_id: id }),
162180
}
163181
}
@@ -180,10 +198,3 @@ impl PdClient for MockPdClient {
180198

181199
async fn invalidate_region_cache(&self, _ver_id: crate::region::RegionVerId) {}
182200
}
183-
184-
pub fn mock_store() -> RegionStore {
185-
RegionStore {
186-
region_with_leader: RegionWithLeader::default(),
187-
client: Arc::new(MockKvClient::new("foo".to_owned(), None)),
188-
}
189-
}

src/raw/client.rs

Lines changed: 115 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,20 @@
11
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
22

3+
use core::ops::Range;
4+
use std::{str::FromStr, sync::Arc, u32};
5+
6+
use slog::{Drain, Logger};
37
use tikv_client_common::Error;
8+
use tikv_client_proto::metapb;
49

510
use crate::{
611
backoff::DEFAULT_REGION_BACKOFF,
712
config::Config,
8-
pd::PdRpcClient,
13+
pd::{PdClient, PdRpcClient},
914
raw::lowering::*,
1015
request::{Collect, CollectSingle, Plan},
1116
BoundRange, ColumnFamily, Key, KvPair, Result, Value,
1217
};
13-
use slog::{Drain, Logger};
14-
use std::{sync::Arc, u32};
1518

1619
const MAX_RAW_KV_SCAN_LIMIT: u32 = 10240;
1720

@@ -23,15 +26,15 @@ const MAX_RAW_KV_SCAN_LIMIT: u32 = 10240;
2326
/// The returned results of raw request methods are [`Future`](std::future::Future)s that must be
2427
/// awaited to execute.
2528
#[derive(Clone)]
26-
pub struct Client {
27-
rpc: Arc<PdRpcClient>,
29+
pub struct Client<PdC: PdClient = PdRpcClient> {
30+
rpc: Arc<PdC>,
2831
cf: Option<ColumnFamily>,
2932
/// Whether to use the [`atomic mode`](Client::with_atomic_for_cas).
3033
atomic: bool,
3134
logger: Logger,
3235
}
3336

34-
impl Client {
37+
impl Client<PdRpcClient> {
3538
/// Create a raw [`Client`] and connect to the TiKV cluster.
3639
///
3740
/// Because TiKV is managed by a [PD](https://github.com/pingcap/pd/) cluster, the endpoints for
@@ -50,7 +53,7 @@ impl Client {
5053
pub async fn new<S: Into<String>>(
5154
pd_endpoints: Vec<S>,
5255
logger: Option<Logger>,
53-
) -> Result<Client> {
56+
) -> Result<Self> {
5457
Self::new_with_config(pd_endpoints, Config::default(), logger).await
5558
}
5659

@@ -78,7 +81,7 @@ impl Client {
7881
pd_endpoints: Vec<S>,
7982
config: Config,
8083
optional_logger: Option<Logger>,
81-
) -> Result<Client> {
84+
) -> Result<Self> {
8285
let logger = optional_logger.unwrap_or_else(|| {
8386
let plain = slog_term::PlainSyncDecorator::new(std::io::stdout());
8487
Logger::root(
@@ -125,7 +128,7 @@ impl Client {
125128
/// let get_request = client.get("foo".to_owned());
126129
/// # });
127130
/// ```
128-
pub fn with_cf(&self, cf: ColumnFamily) -> Client {
131+
pub fn with_cf(&self, cf: ColumnFamily) -> Self {
129132
Client {
130133
rpc: self.rpc.clone(),
131134
cf: Some(cf),
@@ -141,15 +144,17 @@ impl Client {
141144
/// the atomicity of CAS, write operations like [`put`](Client::put) or
142145
/// [`delete`](Client::delete) in atomic mode are more expensive. Some
143146
/// operations are not supported in the mode.
144-
pub fn with_atomic_for_cas(&self) -> Client {
147+
pub fn with_atomic_for_cas(&self) -> Self {
145148
Client {
146149
rpc: self.rpc.clone(),
147150
cf: self.cf.clone(),
148151
atomic: true,
149152
logger: self.logger.clone(),
150153
}
151154
}
155+
}
152156

157+
impl<PdC: PdClient> Client<PdC> {
153158
/// Create a new 'get' request.
154159
///
155160
/// Once resolved this request will result in the fetching of the value associated with the
@@ -517,6 +522,29 @@ impl Client {
517522
plan.execute().await
518523
}
519524

525+
pub async fn coprocessor(
526+
&self,
527+
copr_name: impl Into<String>,
528+
copr_version_req: impl Into<String>,
529+
ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
530+
request_builder: impl Fn(metapb::Region, Vec<Range<Key>>) -> Vec<u8> + Send + Sync + 'static,
531+
) -> Result<Vec<(Vec<u8>, Vec<Range<Key>>)>> {
532+
let copr_version_req = copr_version_req.into();
533+
semver::VersionReq::from_str(&copr_version_req)?;
534+
let req = new_raw_coprocessor_request(
535+
copr_name.into(),
536+
copr_version_req,
537+
ranges.into_iter().map(Into::into),
538+
request_builder,
539+
);
540+
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), req)
541+
.preserve_shard()
542+
.retry_multi_region(DEFAULT_REGION_BACKOFF)
543+
.post_process_default()
544+
.plan();
545+
plan.execute().await
546+
}
547+
520548
async fn scan_inner(
521549
&self,
522550
range: impl Into<BoundRange>,
@@ -576,3 +604,80 @@ impl Client {
576604
self.atomic.then(|| ()).ok_or(Error::UnsupportedMode)
577605
}
578606
}
607+
608+
#[cfg(test)]
609+
mod tests {
610+
use super::*;
611+
use crate::{
612+
mock::{MockKvClient, MockPdClient},
613+
Result,
614+
};
615+
use std::{any::Any, sync::Arc};
616+
use tikv_client_proto::kvrpcpb;
617+
618+
#[tokio::test]
619+
async fn test_raw_coprocessor() -> Result<()> {
620+
let plain = slog_term::PlainSyncDecorator::new(std::io::stdout());
621+
let logger = Logger::root(
622+
slog_term::FullFormat::new(plain)
623+
.build()
624+
.filter_level(slog::Level::Info)
625+
.fuse(),
626+
o!(),
627+
);
628+
let pd_client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook(
629+
move |req: &dyn Any| {
630+
if let Some(req) = req.downcast_ref::<kvrpcpb::RawCoprocessorRequest>() {
631+
assert_eq!(req.copr_name, "example");
632+
assert_eq!(req.copr_version_req, "0.1.0");
633+
let resp = kvrpcpb::RawCoprocessorResponse {
634+
data: req.data.clone(),
635+
..Default::default()
636+
};
637+
Ok(Box::new(resp) as Box<dyn Any>)
638+
} else {
639+
unreachable!()
640+
}
641+
},
642+
)));
643+
let client = Client {
644+
rpc: pd_client,
645+
cf: Some(ColumnFamily::Default),
646+
atomic: false,
647+
logger,
648+
};
649+
let resps = client
650+
.coprocessor(
651+
"example",
652+
"0.1.0",
653+
vec![vec![5]..vec![15], vec![20]..vec![]],
654+
|region, ranges| format!("{:?}:{:?}", region.id, ranges).into_bytes(),
655+
)
656+
.await?;
657+
let resps: Vec<_> = resps
658+
.into_iter()
659+
.map(|(data, ranges)| (String::from_utf8(data).unwrap(), ranges))
660+
.collect();
661+
assert_eq!(
662+
resps,
663+
vec![
664+
(
665+
"1:[Key(05)..Key(0A)]".to_string(),
666+
vec![Key::from(vec![5])..Key::from(vec![10])]
667+
),
668+
(
669+
"2:[Key(0A)..Key(0F), Key(14)..Key(FAFA)]".to_string(),
670+
vec![
671+
Key::from(vec![10])..Key::from(vec![15]),
672+
Key::from(vec![20])..Key::from(vec![250, 250])
673+
]
674+
),
675+
(
676+
"3:[Key(FAFA)..Key()]".to_string(),
677+
vec![Key::from(vec![250, 250])..Key::from(vec![])]
678+
)
679+
]
680+
);
681+
Ok(())
682+
}
683+
}

0 commit comments

Comments
 (0)