Skip to content

Commit 222908e

Browse files
committed
git fire: changing sig for PdRpcClient
Signed-off-by: iosmanthus <myosmanthustree@gmail.com>
1 parent aa6e4b5 commit 222908e

File tree

3 files changed

+37
-33
lines changed

3 files changed

+37
-33
lines changed

src/pd/client.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -219,11 +219,9 @@ pub struct PdRpcClient<C, KvC: KvConnect + Send + Sync + 'static = TikvConnect,
219219
pd: Arc<RetryClient<Cl>>,
220220
kv_connect: KvC,
221221
kv_client_cache: Arc<RwLock<HashMap<String, KvC::KvClient>>>,
222-
enable_codec: bool,
223222
region_cache: RegionCache<RetryClient<Cl>>,
224223
logger: Logger,
225-
// TODO: change to a real codec.
226-
_phantom: PhantomData<C>,
224+
codec: C,
227225
}
228226

229227
#[async_trait]
@@ -280,7 +278,7 @@ impl<C> PdRpcClient<C, TikvConnect, Cluster> {
280278
pub async fn connect(
281279
pd_endpoints: &[String],
282280
config: Config,
283-
enable_codec: bool,
281+
codec: C,
284282
logger: Logger,
285283
) -> Result<PdRpcClient<C, TikvConnect, Cluster>> {
286284
PdRpcClient::new(
@@ -289,7 +287,7 @@ impl<C> PdRpcClient<C, TikvConnect, Cluster> {
289287
|env, security_mgr| {
290288
RetryClient::connect(env, pd_endpoints, security_mgr, config.timeout)
291289
},
292-
enable_codec,
290+
codec,
293291
logger,
294292
)
295293
.await
@@ -310,7 +308,7 @@ impl<C, KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<C, KvC, Cl> {
310308
config: Config,
311309
kv_connect: MakeKvC,
312310
pd: MakePd,
313-
enable_codec: bool,
311+
codec: C,
314312
logger: Logger,
315313
) -> Result<PdRpcClient<C, KvC, Cl>>
316314
where
@@ -340,11 +338,9 @@ impl<C, KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<C, KvC, Cl> {
340338
pd: pd.clone(),
341339
kv_client_cache,
342340
kv_connect: kv_connect(env, security_mgr),
343-
enable_codec,
344341
region_cache: RegionCache::new(pd),
345342
logger,
346-
// TODO
347-
_phantom: PhantomData,
343+
codec
348344
})
349345
}
350346

src/raw/client.rs

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,17 @@ use core::ops::Range;
44
use std::{marker::PhantomData, str::FromStr, sync::Arc, u32};
55

66
use slog::{Drain, Logger};
7+
78
use tikv_client_common::Error;
89
use tikv_client_proto::metapb;
910

1011
use crate::{
12+
Backoff,
1113
backoff::DEFAULT_REGION_BACKOFF,
14+
BoundRange,
15+
ColumnFamily,
1216
config::Config,
13-
pd::{PdClient, PdRpcClient},
14-
raw::lowering::*,
15-
request::{request_codec::RequestCodec, Collect, CollectSingle, Plan},
16-
Backoff, BoundRange, ColumnFamily, Key, KvPair, Result, Value,
17+
Key, KvPair, pd::{PdClient, PdRpcClient}, raw::lowering::*, request::{Collect, CollectSingle, Plan, request_codec::RequestCodec}, Result, Value,
1718
};
1819

1920
const MAX_RAW_KV_SCAN_LIMIT: u32 = 10240;
@@ -53,9 +54,10 @@ impl<C: RequestCodec> Client<C, PdRpcClient<C>> {
5354
/// ```
5455
pub async fn new<S: Into<String>>(
5556
pd_endpoints: Vec<S>,
57+
codec: C,
5658
logger: Option<Logger>,
5759
) -> Result<Self> {
58-
Self::new_with_config(pd_endpoints, Config::default(), logger).await
60+
Self::new_with_config(pd_endpoints, Config::default(), codec, logger).await
5961
}
6062

6163
/// Create a raw [`Client`] with a custom configuration, and connect to the TiKV cluster.
@@ -83,6 +85,7 @@ impl<C: RequestCodec> Client<C, PdRpcClient<C>> {
8385
pub async fn new_with_config<S: Into<String>>(
8486
pd_endpoints: Vec<S>,
8587
config: Config,
88+
codec: C,
8689
optional_logger: Option<Logger>,
8790
) -> Result<Self> {
8891
let logger = optional_logger.unwrap_or_else(|| {
@@ -98,7 +101,7 @@ impl<C: RequestCodec> Client<C, PdRpcClient<C>> {
98101
debug!(logger, "creating new raw client");
99102
let pd_endpoints: Vec<String> = pd_endpoints.into_iter().map(Into::into).collect();
100103
let rpc =
101-
Arc::new(PdRpcClient::connect(&pd_endpoints, config, false, logger.clone()).await?);
104+
Arc::new(PdRpcClient::connect(&pd_endpoints, config, codec, logger.clone()).await?);
102105
Ok(Client {
103106
rpc,
104107
cf: None,
@@ -212,7 +215,7 @@ impl<C: RequestCodec, PdC: PdClient> Client<C, PdC> {
212215
/// ```
213216
pub async fn batch_get(
214217
&self,
215-
keys: impl IntoIterator<Item = impl Into<Key>>,
218+
keys: impl IntoIterator<Item=impl Into<Key>>,
216219
) -> Result<Vec<KvPair>> {
217220
debug!(self.logger, "invoking raw batch_get request");
218221
let request =
@@ -274,7 +277,7 @@ impl<C: RequestCodec, PdC: PdClient> Client<C, PdC> {
274277
/// ```
275278
pub async fn batch_put(
276279
&self,
277-
pairs: impl IntoIterator<Item = impl Into<KvPair>>,
280+
pairs: impl IntoIterator<Item=impl Into<KvPair>>,
278281
) -> Result<()> {
279282
debug!(self.logger, "invoking raw batch_put request");
280283
let request = new_raw_batch_put_request::<C>(
@@ -336,7 +339,7 @@ impl<C: RequestCodec, PdC: PdClient> Client<C, PdC> {
336339
/// let result: () = req.await.unwrap();
337340
/// # });
338341
/// ```
339-
pub async fn batch_delete(&self, keys: impl IntoIterator<Item = impl Into<Key>>) -> Result<()> {
342+
pub async fn batch_delete(&self, keys: impl IntoIterator<Item=impl Into<Key>>) -> Result<()> {
340343
debug!(self.logger, "invoking raw batch_delete request");
341344
self.assert_non_atomic()?;
342345
let request =
@@ -462,7 +465,7 @@ impl<C: RequestCodec, PdC: PdClient> Client<C, PdC> {
462465
/// ```
463466
pub async fn batch_scan(
464467
&self,
465-
ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
468+
ranges: impl IntoIterator<Item=impl Into<BoundRange>>,
466469
each_limit: u32,
467470
) -> Result<Vec<KvPair>> {
468471
debug!(self.logger, "invoking raw batch_scan request");
@@ -494,7 +497,7 @@ impl<C: RequestCodec, PdC: PdClient> Client<C, PdC> {
494497
/// ```
495498
pub async fn batch_scan_keys(
496499
&self,
497-
ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
500+
ranges: impl IntoIterator<Item=impl Into<BoundRange>>,
498501
each_limit: u32,
499502
) -> Result<Vec<Key>> {
500503
debug!(self.logger, "invoking raw batch_scan_keys request");
@@ -544,7 +547,7 @@ impl<C: RequestCodec, PdC: PdClient> Client<C, PdC> {
544547
&self,
545548
copr_name: impl Into<String>,
546549
copr_version_req: impl Into<String>,
547-
ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
550+
ranges: impl IntoIterator<Item=impl Into<BoundRange>>,
548551
request_builder: impl Fn(metapb::Region, Vec<Range<Key>>) -> Vec<u8> + Send + Sync + 'static,
549552
) -> Result<Vec<(Vec<u8>, Vec<Range<Key>>)>> {
550553
let copr_version_req = copr_version_req.into();
@@ -590,7 +593,7 @@ impl<C: RequestCodec, PdC: PdClient> Client<C, PdC> {
590593

591594
async fn batch_scan_inner(
592595
&self,
593-
ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
596+
ranges: impl IntoIterator<Item=impl Into<BoundRange>>,
594597
each_limit: u32,
595598
key_only: bool,
596599
) -> Result<Vec<KvPair>> {
@@ -625,13 +628,16 @@ impl<C: RequestCodec, PdC: PdClient> Client<C, PdC> {
625628

626629
#[cfg(test)]
627630
mod tests {
628-
use super::*;
631+
use std::{any::Any, sync::Arc};
632+
633+
use tikv_client_proto::kvrpcpb;
634+
629635
use crate::{
630636
mock::{MockKvClient, MockPdClient},
631637
Result,
632638
};
633-
use std::{any::Any, sync::Arc};
634-
use tikv_client_proto::kvrpcpb;
639+
640+
use super::*;
635641

636642
#[tokio::test]
637643
async fn test_raw_coprocessor() -> Result<()> {
@@ -687,13 +693,13 @@ mod tests {
687693
"2:[Key(0A)..Key(0F), Key(14)..Key(FAFA)]".to_string(),
688694
vec![
689695
Key::from(vec![10])..Key::from(vec![15]),
690-
Key::from(vec![20])..Key::from(vec![250, 250])
696+
Key::from(vec![20])..Key::from(vec![250, 250]),
691697
]
692698
),
693699
(
694700
"3:[Key(FAFA)..Key()]".to_string(),
695701
vec![Key::from(vec![250, 250])..Key::from(vec![])]
696-
)
702+
),
697703
]
698704
);
699705
Ok(())

src/transaction/client.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,10 @@ use crate::{
1010
backoff::{DEFAULT_REGION_BACKOFF, OPTIMISTIC_BACKOFF},
1111
config::Config,
1212
pd::{PdClient, PdRpcClient},
13-
request::{request_codec::RequestCodec, Plan},
13+
request::{Plan, request_codec::RequestCodec},
14+
Result,
1415
timestamp::TimestampExt,
1516
transaction::{Snapshot, Transaction, TransactionOptions},
16-
Result,
1717
};
1818

1919
use super::{requests::new_scan_lock_request, resolve_locks};
@@ -44,8 +44,8 @@ pub struct Client<C> {
4444
}
4545

4646
impl<C> Client<C>
47-
where
48-
C: RequestCodec,
47+
where
48+
C: RequestCodec,
4949
{
5050
/// Create a transactional [`Client`] and connect to the TiKV cluster.
5151
///
@@ -66,10 +66,11 @@ where
6666
/// ```
6767
pub async fn new<S: Into<String>>(
6868
pd_endpoints: Vec<S>,
69+
codec: C,
6970
logger: Option<Logger>,
7071
) -> Result<Client<C>> {
7172
// debug!(self.logger, "creating transactional client");
72-
Self::new_with_config(pd_endpoints, Config::default(), logger).await
73+
Self::new_with_config(pd_endpoints, Config::default(), codec, logger).await
7374
}
7475

7576
/// Create a transactional [`Client`] with a custom configuration, and connect to the TiKV cluster.
@@ -97,6 +98,7 @@ where
9798
pub async fn new_with_config<S: Into<String>>(
9899
pd_endpoints: Vec<S>,
99100
config: Config,
101+
codec: C,
100102
optional_logger: Option<Logger>,
101103
) -> Result<Client<C>> {
102104
let logger = optional_logger.unwrap_or_else(|| {
@@ -111,7 +113,7 @@ where
111113
});
112114
debug!(logger, "creating new transactional client");
113115
let pd_endpoints: Vec<String> = pd_endpoints.into_iter().map(Into::into).collect();
114-
let pd = Arc::new(PdRpcClient::connect(&pd_endpoints, config, true, logger.clone()).await?);
116+
let pd = Arc::new(PdRpcClient::connect(&pd_endpoints, config, codec, logger.clone()).await?);
115117
Ok(Client {
116118
pd,
117119
logger,

0 commit comments

Comments
 (0)