Skip to content

Commit aa6e4b5

Browse files
committed
cargo fmt
Signed-off-by: iosmanthus <myosmanthustree@gmail.com>
1 parent 1f314ee commit aa6e4b5

File tree

14 files changed

+364
-239
lines changed

14 files changed

+364
-239
lines changed

src/pd/client.rs

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.
22

3-
use std::{collections::HashMap, sync::Arc, thread};
4-
use std::marker::PhantomData;
3+
use std::{collections::HashMap, marker::PhantomData, sync::Arc, thread};
54

65
use async_trait::async_trait;
76
use futures::{prelude::*, stream::BoxStream};
@@ -14,15 +13,15 @@ use tikv_client_proto::{kvrpcpb, metapb};
1413
use tikv_client_store::{KvClient, KvConnect, TikvConnect};
1514

1615
use crate::{
17-
BoundRange,
1816
compat::stream_fn,
19-
Config,
20-
Key,
2117
kv::codec,
2218
pd::{retry::RetryClientTrait, RetryClient},
23-
region::{RegionId, RegionVerId, RegionWithLeader}, region_cache::RegionCache, Result, SecurityManager, store::RegionStore, Timestamp,
19+
region::{RegionId, RegionVerId, RegionWithLeader},
20+
region_cache::RegionCache,
21+
request::request_codec::RequestCodec,
22+
store::RegionStore,
23+
BoundRange, Config, Key, Result, SecurityManager, Timestamp,
2424
};
25-
use crate::request::request_codec::RequestCodec;
2625

2726
const CQ_COUNT: usize = 1;
2827
const CLIENT_PREFIX: &str = "tikv-client";
@@ -75,11 +74,11 @@ pub trait PdClient: Send + Sync + 'static {
7574

7675
fn group_keys_by_region<K, K2>(
7776
self: Arc<Self>,
78-
keys: impl Iterator<Item=K> + Send + Sync + 'static,
77+
keys: impl Iterator<Item = K> + Send + Sync + 'static,
7978
) -> BoxStream<'static, Result<(RegionId, Vec<K2>)>>
80-
where
81-
K: AsRef<Key> + Into<K2> + Send + Sync + 'static,
82-
K2: Send + Sync + 'static,
79+
where
80+
K: AsRef<Key> + Into<K2> + Send + Sync + 'static,
81+
K2: Send + Sync + 'static,
8382
{
8483
let keys = keys.peekable();
8584
stream_fn(keys, move |mut keys| {
@@ -101,7 +100,7 @@ pub trait PdClient: Send + Sync + 'static {
101100
}
102101
}
103102
})
104-
.boxed()
103+
.boxed()
105104
}
106105

107106
/// Returns a Stream which iterates over the contexts for each region covered by range.
@@ -132,7 +131,7 @@ pub trait PdClient: Send + Sync + 'static {
132131
Ok(Some((Some(region_end), store)))
133132
}
134133
})
135-
.boxed()
134+
.boxed()
136135
}
137136

138137
/// Returns a Stream which iterates over the contexts for ranges in the same region.
@@ -196,7 +195,7 @@ pub trait PdClient: Send + Sync + 'static {
196195
}
197196
}
198197
})
199-
.boxed()
198+
.boxed()
200199
}
201200

202201
fn decode_region(mut region: RegionWithLeader, enable_codec: bool) -> Result<RegionWithLeader> {
@@ -293,7 +292,7 @@ impl<C> PdRpcClient<C, TikvConnect, Cluster> {
293292
enable_codec,
294293
logger,
295294
)
296-
.await
295+
.await
297296
}
298297
}
299298

@@ -314,10 +313,10 @@ impl<C, KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<C, KvC, Cl> {
314313
enable_codec: bool,
315314
logger: Logger,
316315
) -> Result<PdRpcClient<C, KvC, Cl>>
317-
where
318-
PdFut: Future<Output=Result<RetryClient<Cl>>>,
319-
MakeKvC: FnOnce(Arc<Environment>, Arc<SecurityManager>) -> KvC,
320-
MakePd: FnOnce(Arc<Environment>, Arc<SecurityManager>) -> PdFut,
316+
where
317+
PdFut: Future<Output = Result<RetryClient<Cl>>>,
318+
MakeKvC: FnOnce(Arc<Environment>, Arc<SecurityManager>) -> KvC,
319+
MakePd: FnOnce(Arc<Environment>, Arc<SecurityManager>) -> PdFut,
321320
{
322321
let env = Arc::new(
323322
EnvBuilder::new()
@@ -327,7 +326,7 @@ impl<C, KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<C, KvC, Cl> {
327326
);
328327
let security_mgr = Arc::new(
329328
if let (Some(ca_path), Some(cert_path), Some(key_path)) =
330-
(&config.ca_path, &config.cert_path, &config.key_path)
329+
(&config.ca_path, &config.cert_path, &config.key_path)
331330
{
332331
SecurityManager::load(ca_path, cert_path, key_path)?
333332
} else {

src/raw/client.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
22

33
use core::ops::Range;
4-
use std::{str::FromStr, sync::Arc, u32};
5-
use std::marker::PhantomData;
4+
use std::{marker::PhantomData, str::FromStr, sync::Arc, u32};
65

76
use slog::{Drain, Logger};
87
use tikv_client_common::Error;
@@ -13,10 +12,9 @@ use crate::{
1312
config::Config,
1413
pd::{PdClient, PdRpcClient},
1514
raw::lowering::*,
16-
request::{Collect, CollectSingle, Plan},
15+
request::{request_codec::RequestCodec, Collect, CollectSingle, Plan},
1716
Backoff, BoundRange, ColumnFamily, Key, KvPair, Result, Value,
1817
};
19-
use crate::request::request_codec::RequestCodec;
2018

2119
const MAX_RAW_KV_SCAN_LIMIT: u32 = 10240;
2220

@@ -106,7 +104,7 @@ impl<C: RequestCodec> Client<C, PdRpcClient<C>> {
106104
cf: None,
107105
atomic: false,
108106
logger,
109-
_phantom: PhantomData
107+
_phantom: PhantomData,
110108
})
111109
}
112110

@@ -141,7 +139,7 @@ impl<C: RequestCodec> Client<C, PdRpcClient<C>> {
141139
cf: Some(cf),
142140
atomic: self.atomic,
143141
logger: self.logger.clone(),
144-
_phantom: PhantomData
142+
_phantom: PhantomData,
145143
}
146144
}
147145

@@ -159,12 +157,12 @@ impl<C: RequestCodec> Client<C, PdRpcClient<C>> {
159157
cf: self.cf.clone(),
160158
atomic: true,
161159
logger: self.logger.clone(),
162-
_phantom: PhantomData
160+
_phantom: PhantomData,
163161
}
164162
}
165163
}
166164

167-
impl<C:RequestCodec, PdC: PdClient> Client<C, PdC> {
165+
impl<C: RequestCodec, PdC: PdClient> Client<C, PdC> {
168166
/// Create a new 'get' request.
169167
///
170168
/// Once resolved this request will result in the fetching of the value associated with the
@@ -217,7 +215,8 @@ impl<C:RequestCodec, PdC: PdClient> Client<C, PdC> {
217215
keys: impl IntoIterator<Item = impl Into<Key>>,
218216
) -> Result<Vec<KvPair>> {
219217
debug!(self.logger, "invoking raw batch_get request");
220-
let request = new_raw_batch_get_request::<C>(keys.into_iter().map(Into::into), self.cf.clone());
218+
let request =
219+
new_raw_batch_get_request::<C>(keys.into_iter().map(Into::into), self.cf.clone());
221220
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
222221
.retry_multi_region(DEFAULT_REGION_BACKOFF)
223222
.merge(Collect)
@@ -245,7 +244,8 @@ impl<C:RequestCodec, PdC: PdClient> Client<C, PdC> {
245244
/// ```
246245
pub async fn put(&self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
247246
debug!(self.logger, "invoking raw put request");
248-
let request = new_raw_put_request::<C>(key.into(), value.into(), self.cf.clone(), self.atomic);
247+
let request =
248+
new_raw_put_request::<C>(key.into(), value.into(), self.cf.clone(), self.atomic);
249249
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
250250
.retry_multi_region(DEFAULT_REGION_BACKOFF)
251251
.merge(CollectSingle)

src/raw/lowering.rs

Lines changed: 37 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -4,27 +4,31 @@
44
//! types (i.e., the types from the client crate) and converts these to the types used in the
55
//! generated protobuf code, then calls the low-level ctor functions in the requests module.
66
7-
use std::{iter::Iterator, ops::Range, sync::Arc};
8-
use std::marker::PhantomData;
7+
use std::{iter::Iterator, marker::PhantomData, ops::Range, sync::Arc};
98

109
use tikv_client_proto::{kvrpcpb, metapb};
1110

12-
use crate::{BoundRange, ColumnFamily, Key, KvPair, raw::requests, Value};
13-
use crate::request::KvRequest;
14-
use crate::request::request_codec::RequestCodec;
11+
use crate::{
12+
raw::requests,
13+
request::{request_codec::RequestCodec, KvRequest},
14+
BoundRange, ColumnFamily, Key, KvPair, Value,
15+
};
1516

16-
pub fn new_raw_get_request<C:RequestCodec>(key: Key, cf: Option<ColumnFamily>) -> kvrpcpb::RawGetRequest {
17+
pub fn new_raw_get_request<C: RequestCodec>(
18+
key: Key,
19+
cf: Option<ColumnFamily>,
20+
) -> kvrpcpb::RawGetRequest {
1721
requests::new_raw_get_request::<C>(key.into(), cf)
1822
}
1923

20-
pub fn new_raw_batch_get_request<C:RequestCodec>(
21-
keys: impl Iterator<Item=Key>,
24+
pub fn new_raw_batch_get_request<C: RequestCodec>(
25+
keys: impl Iterator<Item = Key>,
2226
cf: Option<ColumnFamily>,
2327
) -> kvrpcpb::RawBatchGetRequest {
2428
requests::new_raw_batch_get_request::<C>(keys.map(Into::into).collect(), cf)
2529
}
2630

27-
pub fn new_raw_put_request<C:RequestCodec>(
31+
pub fn new_raw_put_request<C: RequestCodec>(
2832
key: Key,
2933
value: Value,
3034
cf: Option<ColumnFamily>,
@@ -33,43 +37,47 @@ pub fn new_raw_put_request<C:RequestCodec>(
3337
requests::new_raw_put_request::<C>(key.into(), value, cf, atomic)
3438
}
3539

36-
pub fn new_raw_batch_put_request<C:RequestCodec>(
37-
pairs: impl Iterator<Item=KvPair>,
40+
pub fn new_raw_batch_put_request<C: RequestCodec>(
41+
pairs: impl Iterator<Item = KvPair>,
3842
cf: Option<ColumnFamily>,
3943
atomic: bool,
4044
) -> kvrpcpb::RawBatchPutRequest {
4145
requests::new_raw_batch_put_request::<C>(pairs.map(Into::into).collect(), cf, atomic)
4246
}
4347

44-
pub fn new_raw_delete_request<C:RequestCodec>(
48+
pub fn new_raw_delete_request<C: RequestCodec>(
4549
key: Key,
4650
cf: Option<ColumnFamily>,
4751
atomic: bool,
4852
) -> kvrpcpb::RawDeleteRequest {
4953
requests::new_raw_delete_request::<C>(key.into(), cf, atomic)
5054
}
5155

52-
pub fn new_raw_batch_delete_request<C:RequestCodec>(
53-
keys: impl Iterator<Item=Key>,
56+
pub fn new_raw_batch_delete_request<C: RequestCodec>(
57+
keys: impl Iterator<Item = Key>,
5458
cf: Option<ColumnFamily>,
5559
) -> kvrpcpb::RawBatchDeleteRequest {
5660
requests::new_raw_batch_delete_request::<C>(keys.map(Into::into).collect(), cf)
5761
}
5862

59-
pub fn new_raw_delete_range_request<C:RequestCodec>(
63+
pub fn new_raw_delete_range_request<C: RequestCodec>(
6064
range: BoundRange,
6165
cf: Option<ColumnFamily>,
6266
) -> kvrpcpb::RawDeleteRangeRequest {
6367
let (start_key, end_key) = range.into_keys();
64-
requests::new_raw_delete_range_request::<C>(start_key.into(), end_key.unwrap_or_default().into(), cf)
68+
requests::new_raw_delete_range_request::<C>(
69+
start_key.into(),
70+
end_key.unwrap_or_default().into(),
71+
cf,
72+
)
6573
}
6674

67-
pub fn new_raw_scan_request<C:RequestCodec>(
75+
pub fn new_raw_scan_request<C: RequestCodec>(
6876
range: BoundRange,
6977
limit: u32,
7078
key_only: bool,
7179
cf: Option<ColumnFamily>,
72-
) -> kvrpcpb::RawScanRequest {
80+
) -> kvrpcpb::RawScanRequest {
7381
let (start_key, end_key) = range.into_keys();
7482
requests::new_raw_scan_request::<C>(
7583
start_key.into(),
@@ -80,16 +88,21 @@ pub fn new_raw_scan_request<C:RequestCodec>(
8088
)
8189
}
8290

83-
pub fn new_raw_batch_scan_request<C:RequestCodec>(
84-
ranges: impl Iterator<Item=BoundRange>,
91+
pub fn new_raw_batch_scan_request<C: RequestCodec>(
92+
ranges: impl Iterator<Item = BoundRange>,
8593
each_limit: u32,
8694
key_only: bool,
8795
cf: Option<ColumnFamily>,
8896
) -> kvrpcpb::RawBatchScanRequest {
89-
requests::new_raw_batch_scan_request::<C>(ranges.map(Into::into).collect(), each_limit, key_only, cf)
97+
requests::new_raw_batch_scan_request::<C>(
98+
ranges.map(Into::into).collect(),
99+
each_limit,
100+
key_only,
101+
cf,
102+
)
90103
}
91104

92-
pub fn new_cas_request<C:RequestCodec>(
105+
pub fn new_cas_request<C: RequestCodec>(
93106
key: Key,
94107
value: Value,
95108
previous_value: Option<Value>,
@@ -98,10 +111,10 @@ pub fn new_cas_request<C:RequestCodec>(
98111
requests::new_cas_request::<C>(key.into(), value, previous_value, cf)
99112
}
100113

101-
pub fn new_raw_coprocessor_request<C:RequestCodec>(
114+
pub fn new_raw_coprocessor_request<C: RequestCodec>(
102115
copr_name: String,
103116
copr_version_req: String,
104-
ranges: impl Iterator<Item=BoundRange>,
117+
ranges: impl Iterator<Item = BoundRange>,
105118
request_builder: impl Fn(metapb::Region, Vec<Range<Key>>) -> Vec<u8> + Send + Sync + 'static,
106119
) -> requests::RawCoprocessorRequest {
107120
requests::new_raw_coprocessor_request::<C>(

0 commit comments

Comments
 (0)