Skip to content

Commit e8fbe02

Browse files
authored
Merge branch 'master' into fix-update-regioncache
2 parents 62334a0 + 17d05c7 commit e8fbe02

File tree

2 files changed

+14
-16
lines changed

2 files changed

+14
-16
lines changed

src/pd/client.rs

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ pub trait PdClient: Send + Sync + 'static {
7070
fn group_keys_by_region<K, K2>(
7171
self: Arc<Self>,
7272
keys: impl Iterator<Item = K> + Send + Sync + 'static,
73-
) -> BoxStream<'static, Result<(RegionId, Vec<K2>)>>
73+
) -> BoxStream<'static, Result<(RegionWithLeader, Vec<K2>)>>
7474
where
7575
K: AsRef<Key> + Into<K2> + Send + Sync + 'static,
7676
K2: Send + Sync + 'static,
@@ -81,15 +81,14 @@ pub trait PdClient: Send + Sync + 'static {
8181
async move {
8282
if let Some(key) = keys.next() {
8383
let region = this.region_for_key(key.as_ref()).await?;
84-
let id = region.id();
8584
let mut grouped = vec![key.into()];
8685
while let Some(key) = keys.peek() {
8786
if !region.contains(key.as_ref()) {
8887
break;
8988
}
9089
grouped.push(keys.next().unwrap().into());
9190
}
92-
Ok(Some((keys, (id, grouped))))
91+
Ok(Some((keys, (region, grouped))))
9392
} else {
9493
Ok(None)
9594
}
@@ -133,7 +132,7 @@ pub trait PdClient: Send + Sync + 'static {
133132
fn group_ranges_by_region(
134133
self: Arc<Self>,
135134
mut ranges: Vec<kvrpcpb::KeyRange>,
136-
) -> BoxStream<'static, Result<(RegionId, Vec<kvrpcpb::KeyRange>)>> {
135+
) -> BoxStream<'static, Result<(RegionWithLeader, Vec<kvrpcpb::KeyRange>)>> {
137136
ranges.reverse();
138137
stream_fn(Some(ranges), move |ranges| {
139138
let this = self.clone();
@@ -147,7 +146,6 @@ pub trait PdClient: Send + Sync + 'static {
147146
let start_key: Key = range.start_key.clone().into();
148147
let end_key: Key = range.end_key.clone().into();
149148
let region = this.region_for_key(&start_key).await?;
150-
let id = region.id();
151149
let region_start = region.start_key();
152150
let region_end = region.end_key();
153151
let mut grouped = vec![];
@@ -160,7 +158,7 @@ pub trait PdClient: Send + Sync + 'static {
160158
start_key: region_end.into(),
161159
end_key: end_key.into(),
162160
});
163-
return Ok(Some((Some(ranges), (id, grouped))));
161+
return Ok(Some((Some(ranges), (region, grouped))));
164162
}
165163
grouped.push(range);
166164

@@ -180,11 +178,11 @@ pub trait PdClient: Send + Sync + 'static {
180178
start_key: region_end.into(),
181179
end_key: end_key.into(),
182180
});
183-
return Ok(Some((Some(ranges), (id, grouped))));
181+
return Ok(Some((Some(ranges), (region, grouped))));
184182
}
185183
grouped.push(range);
186184
}
187-
Ok(Some((Some(ranges), (id, grouped))))
185+
Ok(Some((Some(ranges), (region, grouped))))
188186
} else {
189187
Ok(None)
190188
}
@@ -452,7 +450,7 @@ pub mod test {
452450
let ranges3 = stream.next().unwrap().unwrap();
453451
let ranges4 = stream.next().unwrap().unwrap();
454452

455-
assert_eq!(ranges1.0, 1);
453+
assert_eq!(ranges1.0.id(), 1);
456454
assert_eq!(
457455
ranges1.1,
458456
vec![
@@ -466,23 +464,23 @@ pub mod test {
466464
}
467465
]
468466
);
469-
assert_eq!(ranges2.0, 2);
467+
assert_eq!(ranges2.0.id(), 2);
470468
assert_eq!(
471469
ranges2.1,
472470
vec![kvrpcpb::KeyRange {
473471
start_key: k_split.clone(),
474472
end_key: k3
475473
}]
476474
);
477-
assert_eq!(ranges3.0, 1);
475+
assert_eq!(ranges3.0.id(), 1);
478476
assert_eq!(
479477
ranges3.1,
480478
vec![kvrpcpb::KeyRange {
481479
start_key: k2,
482480
end_key: k_split.clone()
483481
}]
484482
);
485-
assert_eq!(ranges4.0, 2);
483+
assert_eq!(ranges4.0.id(), 2);
486484
assert_eq!(
487485
ranges4.1,
488486
vec![kvrpcpb::KeyRange {

src/store.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,10 @@ where
3939
pd_client
4040
.clone()
4141
.group_keys_by_region(key_data)
42-
.and_then(move |(region_id, key)| {
42+
.and_then(move |(region, key)| {
4343
pd_client
4444
.clone()
45-
.store_for_id(region_id)
45+
.map_region_to_store(region)
4646
.map_ok(move |store| (key, store))
4747
})
4848
.boxed()
@@ -106,10 +106,10 @@ pub fn store_stream_for_ranges<PdC: PdClient>(
106106
pd_client
107107
.clone()
108108
.group_ranges_by_region(ranges)
109-
.and_then(move |(region_id, range)| {
109+
.and_then(move |(region, range)| {
110110
pd_client
111111
.clone()
112-
.store_for_id(region_id)
112+
.map_region_to_store(region)
113113
.map_ok(move |store| (range, store))
114114
})
115115
.boxed()

0 commit comments

Comments
 (0)