Skip to content

Commit 6615d3d

Browse files
authored
Merge pull request #59 from nrc/rpc
refactor the rpc module
2 parents f2687e9 + e88880f commit 6615d3d

File tree

3 files changed

+35
-136
lines changed

3 files changed

+35
-136
lines changed

src/rpc/client.rs

Lines changed: 31 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,8 @@
44
#![allow(dead_code)]
55

66
use std::{
7-
collections::HashMap,
7+
collections::hash_map::{self, HashMap},
88
fmt,
9-
ops::Deref,
109
sync::{Arc, RwLock},
1110
time::Duration,
1211
};
@@ -108,7 +107,7 @@ impl RpcClientInner {
108107
}
109108

110109
fn locate_key(&self, key: &Key) -> impl Future<Output = Result<KeyLocation>> {
111-
self.load_region(key).map_ok(KeyLocation::new)
110+
self.load_region(key)
112111
}
113112

114113
fn kv_client(&self, context: RegionContext) -> Result<(RegionContext, Arc<KvClient>)> {
@@ -197,7 +196,7 @@ impl RpcClient {
197196
let peer = location.peer().expect("leader must exist");
198197
let store_id = peer.get_store_id();
199198
inner.load_store(store_id).map_ok(|store| RegionContext {
200-
region: location.into_inner(),
199+
region: location,
201200
store,
202201
})
203202
})
@@ -257,7 +256,6 @@ impl RpcClient {
257256
let inner = self.inner();
258257
self.group_tasks_by_region(keys)
259258
.and_then(move |task_groups| {
260-
let task_groups = task_groups.into_inner();
261259
let mut tasks = Vec::with_capacity(task_groups.len());
262260
for (region, keys) in task_groups.into_iter() {
263261
let inner = Arc::clone(&inner);
@@ -302,7 +300,6 @@ impl RpcClient {
302300
Either::Right(
303301
self.group_tasks_by_region(pairs)
304302
.and_then(move |task_groups| {
305-
let task_groups = task_groups.into_inner();
306303
let mut tasks = Vec::with_capacity(task_groups.len());
307304
for (region, pairs) in task_groups.into_iter() {
308305
let inner = Arc::clone(&inner);
@@ -336,7 +333,6 @@ impl RpcClient {
336333
let inner = self.inner();
337334
self.group_tasks_by_region(keys)
338335
.and_then(move |task_groups| {
339-
let task_groups = task_groups.into_inner();
340336
let mut tasks = Vec::with_capacity(task_groups.len());
341337
for (region, keys) in task_groups.into_iter() {
342338
let inner = Arc::clone(&inner);
@@ -374,21 +370,27 @@ impl RpcClient {
374370
let inner = Arc::clone(&self.inner);
375371
loop_fn((inner, scan), |(inner, scan)| {
376372
inner.locate_key(scan.start_key()).and_then(|location| {
377-
let region = location.into_inner();
378-
let cf = scan.cf.clone();
379-
Self::region_context_by_id(Arc::clone(&inner), region.id)
373+
let region = location;
374+
let cf = scan.state.cf.clone();
375+
Self::region_context_by_id(Arc::clone(&inner), region.id())
380376
.map_ok(|(region, client)| {
381377
(scan, region.range(), RawContext::new(region, client, cf))
382378
})
383379
.and_then(|(mut scan, region_range, context)| {
384380
let (start_key, end_key) = scan.range();
385381
context
386382
.client()
387-
.raw_scan(context, start_key, end_key, scan.limit, scan.key_only)
383+
.raw_scan(
384+
context,
385+
start_key,
386+
end_key,
387+
scan.state.limit,
388+
scan.state.key_only,
389+
)
388390
.map_ok(|pairs| (scan, region_range, pairs))
389391
})
390392
.map_ok(|(mut scan, region_range, mut pairs)| {
391-
let limit = scan.limit;
393+
let limit = scan.state.limit;
392394
scan.result_mut().append(&mut pairs);
393395
if scan.result().len() as u32 >= limit {
394396
Loop::Break(scan.into_inner())
@@ -424,9 +426,9 @@ impl RpcClient {
424426
let inner = Arc::clone(&self.inner);
425427
loop_fn((inner, scan), |(inner, scan)| {
426428
inner.locate_key(scan.start_key()).and_then(|location| {
427-
let region = location.into_inner();
428-
let cf = scan.clone();
429-
Self::region_context_by_id(Arc::clone(&inner), region.id)
429+
let region = location;
430+
let cf = scan.state.clone();
431+
Self::region_context_by_id(Arc::clone(&inner), region.id())
430432
.map_ok(|(region, client)| {
431433
(scan, region.range(), RawContext::new(region, client, cf))
432434
})
@@ -482,9 +484,9 @@ impl RegionContext {
482484
impl From<RegionContext> for kvrpcpb::Context {
483485
fn from(mut ctx: RegionContext) -> kvrpcpb::Context {
484486
let mut kvctx = kvrpcpb::Context::default();
485-
kvctx.set_region_id(ctx.region.id);
486-
kvctx.set_region_epoch(ctx.region.take_region_epoch());
487-
kvctx.set_peer(ctx.region.peer().expect("leader must exist").into_inner());
487+
kvctx.set_region_id(ctx.region.id());
488+
kvctx.set_region_epoch(ctx.region.region.take_region_epoch());
489+
kvctx.set_peer(ctx.region.peer().expect("leader must exist"));
488490
kvctx
489491
}
490492
}
@@ -523,25 +525,7 @@ impl TxnContext {
523525
}
524526
}
525527

526-
struct KeyLocation(Region);
527-
528-
impl KeyLocation {
529-
fn new(region: Region) -> Self {
530-
KeyLocation(region)
531-
}
532-
533-
fn into_inner(self) -> Region {
534-
self.0
535-
}
536-
}
537-
538-
impl Deref for KeyLocation {
539-
type Target = Region;
540-
541-
fn deref(&self) -> &Self::Target {
542-
&self.0
543-
}
544-
}
528+
type KeyLocation = Region;
545529

546530
trait GroupingTask: Clone + Default + Sized {
547531
fn key(&self) -> &Key;
@@ -550,36 +534,33 @@ trait GroupingTask: Clone + Default + Sized {
550534
#[derive(Default)]
551535
struct GroupedTasks<Task: GroupingTask>(HashMap<RegionVerId, Vec<Task>>, RegionVerId);
552536

553-
impl<Task> GroupedTasks<Task>
554-
where
555-
Task: GroupingTask,
556-
{
537+
impl<Task: GroupingTask> GroupedTasks<Task> {
557538
fn new(ver_id: RegionVerId, task: Task) -> Self {
558539
let mut map = HashMap::with_capacity(1);
559540
map.insert(ver_id.clone(), vec![task]);
560541
GroupedTasks(map, ver_id)
561542
}
562543

544+
#[inline]
563545
fn add(&mut self, ver_id: RegionVerId, task: Task) {
564546
self.0
565547
.entry(ver_id)
566548
.or_insert_with(|| Vec::with_capacity(1))
567549
.push(task)
568550
}
569551

570-
fn into_inner(self) -> HashMap<RegionVerId, Vec<Task>> {
571-
self.0
552+
#[inline]
553+
fn len(&self) -> usize {
554+
self.0.len()
572555
}
573556
}
574557

575-
impl<Task> Deref for GroupedTasks<Task>
576-
where
577-
Task: GroupingTask,
578-
{
579-
type Target = HashMap<RegionVerId, Vec<Task>>;
558+
impl<Task: GroupingTask> IntoIterator for GroupedTasks<Task> {
559+
type Item = (RegionVerId, Vec<Task>);
560+
type IntoIter = hash_map::IntoIter<RegionVerId, Vec<Task>>;
580561

581-
fn deref(&self) -> &Self::Target {
582-
&self.0
562+
fn into_iter(self) -> hash_map::IntoIter<RegionVerId, Vec<Task>> {
563+
self.0.into_iter()
583564
}
584565
}
585566

@@ -666,15 +647,3 @@ where
666647
&self.result
667648
}
668649
}
669-
670-
impl<Res, State> Deref for ScanRegionsContext<Res, State>
671-
where
672-
Res: Default,
673-
State: Sized,
674-
{
675-
type Target = State;
676-
677-
fn deref(&self) -> &Self::Target {
678-
&self.state
679-
}
680-
}

src/rpc/pd/client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ impl PdClient {
195195
cli.get_store_async_opt(&req, opt).map(Compat01As03::new)
196196
},
197197
))
198-
.map_ok(|mut resp| resp.take_store().into())
198+
.map_ok(|mut resp| resp.take_store())
199199
}
200200

201201
pub fn get_region(&self, key: &[u8]) -> impl Future<Output = Result<Region>> {

src/rpc/pd/mod.rs

Lines changed: 3 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,7 @@
33
// TODO: Remove this when txn is done.
44
#![allow(dead_code)]
55

6-
use std::ops::{Deref, DerefMut};
7-
6+
pub use kvproto::metapb::{Peer, Store};
87
use kvproto::{kvrpcpb, metapb};
98

109
pub use crate::rpc::pd::client::PdClient;
@@ -32,26 +31,9 @@ pub struct Region {
3231
pub leader: Option<Peer>,
3332
}
3433

35-
impl Deref for Region {
36-
type Target = metapb::Region;
37-
38-
fn deref(&self) -> &Self::Target {
39-
&self.region
40-
}
41-
}
42-
43-
impl DerefMut for Region {
44-
fn deref_mut(&mut self) -> &mut Self::Target {
45-
&mut self.region
46-
}
47-
}
48-
4934
impl Region {
5035
pub fn new(region: metapb::Region, leader: Option<metapb::Peer>) -> Self {
51-
Region {
52-
region,
53-
leader: leader.map(Peer),
54-
}
36+
Region { region, leader }
5537
}
5638

5739
pub fn switch_peer(&mut self, _to: StoreId) -> Result<()> {
@@ -109,59 +91,7 @@ impl Region {
10991
}
11092

11193
pub fn meta(&self) -> metapb::Region {
112-
Clone::clone(&self.region)
113-
}
114-
}
115-
116-
#[derive(Clone, Debug, PartialEq)]
117-
pub struct Store(metapb::Store);
118-
119-
impl From<metapb::Store> for Store {
120-
fn from(store: metapb::Store) -> Store {
121-
Store(store)
122-
}
123-
}
124-
125-
impl Deref for Store {
126-
type Target = metapb::Store;
127-
128-
fn deref(&self) -> &Self::Target {
129-
&self.0
130-
}
131-
}
132-
133-
impl DerefMut for Store {
134-
fn deref_mut(&mut self) -> &mut Self::Target {
135-
&mut self.0
136-
}
137-
}
138-
139-
#[derive(Clone, Default, Debug, PartialEq)]
140-
pub struct Peer(metapb::Peer);
141-
142-
impl From<metapb::Peer> for Peer {
143-
fn from(peer: metapb::Peer) -> Peer {
144-
Peer(peer)
145-
}
146-
}
147-
148-
impl Deref for Peer {
149-
type Target = metapb::Peer;
150-
151-
fn deref(&self) -> &Self::Target {
152-
&self.0
153-
}
154-
}
155-
156-
impl DerefMut for Peer {
157-
fn deref_mut(&mut self) -> &mut Self::Target {
158-
&mut self.0
159-
}
160-
}
161-
162-
impl Peer {
163-
pub fn into_inner(self) -> metapb::Peer {
164-
self.0
94+
self.region.clone()
16595
}
16696
}
16797

0 commit comments

Comments
 (0)