Skip to content

Commit cfa5f8e

Browse files
committed
Replace a Loop closure with a Stream in rpc/client
Signed-off-by: Nick Cameron <nrc@ncameron.org>
1 parent d773403 commit cfa5f8e

File tree

2 files changed

+53
-110
lines changed

2 files changed

+53
-110
lines changed

examples/raw.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,7 @@ async fn main() -> Result<()> {
6262
.expect("Could not get just deleted entry");
6363
assert!(value.is_none());
6464

65-
// FIXME: batch commands seem to be broken due to over-large types. I think
66-
// LoopFn is to blame.
65+
// FIXME: batch commands seem to be broken due to over-large types.
6766
// You can ask to write multiple key-values at the same time, it is much more
6867
// performant because it is passed in one request to the key-value store.
6968
// let pairs = vec![

src/rpc/client.rs

Lines changed: 52 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
#![allow(dead_code)]
55

66
use std::{
7-
collections::hash_map::{self, HashMap},
7+
collections::{HashMap, VecDeque},
88
fmt,
99
sync::{Arc, RwLock},
1010
time::Duration,
@@ -17,11 +17,11 @@ use grpcio::{EnvBuilder, Environment};
1717
use kvproto::kvrpcpb;
1818

1919
use crate::{
20-
compat::{loop_fn, stream_fn, ClientFutureExt, Loop},
20+
compat::{stream_fn, ClientFutureExt},
2121
kv::BoundRange,
2222
raw::ColumnFamily,
2323
rpc::{
24-
pd::{PdClient, Region, RegionId, RegionVerId, Store, StoreId},
24+
pd::{PdClient, Region, RegionId, Store, StoreId},
2525
security::SecurityManager,
2626
tikv::KvClient,
2727
},
@@ -183,26 +183,24 @@ impl RpcClient {
183183
cf: Option<ColumnFamily>,
184184
) -> impl Future<Output = Result<Vec<KvPair>>> {
185185
let inner = self.inner.clone();
186-
self.group_tasks_by_region(keys)
187-
.and_then(move |task_groups| {
188-
let tasks: Vec<_> = task_groups
189-
.into_iter()
190-
.map(|(region_ver_id, keys)| {
191-
let cf = cf.clone();
192-
inner
193-
.clone()
194-
.raw_from_id(region_ver_id.id)
195-
.and_then(|context| {
196-
context
197-
.client
198-
.raw_batch_get(context.region, cf, keys.into_iter())
199-
})
186+
self.group_tasks_by_region(keys).try_fold(
187+
Vec::new(),
188+
move |mut result, (region_id, keys)| {
189+
let cf = cf.clone();
190+
inner
191+
.clone()
192+
.raw_from_id(region_id)
193+
.and_then(|context| {
194+
context
195+
.client
196+
.raw_batch_get(context.region, cf, keys.into_iter())
200197
})
201-
.collect();
202-
203-
future::try_join_all(tasks)
204-
})
205-
.map_ok(|r| r.into_iter().flat_map(|a| a.into_iter()).collect())
198+
.map_ok(move |mut pairs| {
199+
result.append(&mut pairs);
200+
result
201+
})
202+
},
203+
)
206204
}
207205

208206
pub fn raw_put(
@@ -232,23 +230,15 @@ impl RpcClient {
232230
future::Either::Left(future::err(Error::empty_value()))
233231
} else {
234232
let inner = self.inner.clone();
235-
future::Either::Right(
236-
self.group_tasks_by_region(pairs)
237-
.and_then(move |task_groups| {
238-
let tasks: Vec<_> = task_groups
239-
.into_iter()
240-
.map(|(region, keys)| {
241-
let cf = cf.clone();
242-
inner.clone().raw_from_id(region.id).and_then(|context| {
243-
context.client.raw_batch_put(context.region, cf, keys)
244-
})
245-
})
246-
.collect();
247-
248-
future::try_join_all(tasks)
249-
})
250-
.map_ok(|_| ()),
251-
)
233+
future::Either::Right(self.group_tasks_by_region(pairs).try_for_each(
234+
move |(region_id, keys)| {
235+
let cf = cf.clone();
236+
inner
237+
.clone()
238+
.raw_from_id(region_id)
239+
.and_then(|context| context.client.raw_batch_put(context.region, cf, keys))
240+
},
241+
))
252242
}
253243
}
254244

@@ -270,20 +260,13 @@ impl RpcClient {
270260
) -> impl Future<Output = Result<()>> {
271261
let inner = self.inner.clone();
272262
self.group_tasks_by_region(keys)
273-
.and_then(move |task_groups| {
274-
let tasks: Vec<_> = task_groups
275-
.into_iter()
276-
.map(|(region, keys)| {
277-
let cf = cf.clone();
278-
inner.clone().raw_from_id(region.id).and_then(|context| {
279-
context.client.raw_batch_delete(context.region, cf, keys)
280-
})
281-
})
282-
.collect();
283-
284-
future::try_join_all(tasks)
263+
.try_for_each(move |(region_id, keys)| {
264+
let cf = cf.clone();
265+
inner
266+
.clone()
267+
.raw_from_id(region_id)
268+
.and_then(|context| context.client.raw_batch_delete(context.region, cf, keys))
285269
})
286-
.map_ok(|_| ())
287270
}
288271

289272
pub fn raw_scan(
@@ -368,42 +351,31 @@ impl RpcClient {
368351
fn group_tasks_by_region<Task>(
369352
&self,
370353
tasks: Vec<Task>,
371-
) -> impl Future<Output = Result<GroupedTasks<Task>>>
354+
) -> impl Stream<Item = Result<(RegionId, Vec<Task>)>>
372355
where
373356
Task: GroupingTask,
374357
{
375-
let result: Option<GroupedTasks<Task>> = None;
376358
let inner = self.inner.clone();
377-
loop_fn((0, tasks, result), move |(mut index, tasks, mut result)| {
378-
if index == tasks.len() {
379-
future::Either::Left(future::ok(Loop::Break(result)))
359+
let tasks: VecDeque<Task> = tasks.into();
360+
361+
stream_fn(tasks, move |mut tasks| {
362+
if tasks.is_empty() {
363+
return Either::Right(ready(Ok(None)));
380364
} else {
381-
let inner = Arc::clone(&inner);
382-
future::Either::Right(inner.get_region(tasks[index].key()).map_ok(
383-
move |location| {
384-
while let Some(item) = tasks.get(index) {
385-
if !location.contains(item.key()) {
386-
break;
387-
}
388-
let ver_id = location.ver_id();
389-
let item = item.clone();
390-
if let Some(ref mut grouped) = result {
391-
grouped.add(ver_id, item);
392-
} else {
393-
result = Some(GroupedTasks::new(ver_id, item));
394-
}
395-
index += 1;
396-
}
397-
if index == tasks.len() {
398-
Loop::Break(result)
399-
} else {
400-
Loop::Continue((index, tasks, result))
365+
let inner = inner.clone();
366+
Either::Left(inner.get_region(tasks[0].key()).map_ok(move |location| {
367+
let ver_id = location.ver_id();
368+
let mut grouped = Vec::new();
369+
while let Some(task) = tasks.pop_front() {
370+
if !location.contains(task.key()) {
371+
break;
401372
}
402-
},
403-
))
373+
grouped.push(task);
374+
}
375+
Some((tasks, (ver_id.id, grouped)))
376+
}))
404377
}
405378
})
406-
.map_ok(|r| r.unwrap_or_default())
407379
}
408380
}
409381

@@ -463,34 +435,6 @@ trait GroupingTask: Clone + Default + Sized {
463435
fn key(&self) -> &Key;
464436
}
465437

466-
#[derive(Default)]
467-
struct GroupedTasks<Task: GroupingTask>(HashMap<RegionVerId, Vec<Task>>, RegionVerId);
468-
469-
impl<Task: GroupingTask> GroupedTasks<Task> {
470-
fn new(ver_id: RegionVerId, task: Task) -> Self {
471-
let mut map = HashMap::with_capacity(1);
472-
map.insert(ver_id.clone(), vec![task]);
473-
GroupedTasks(map, ver_id)
474-
}
475-
476-
#[inline]
477-
fn add(&mut self, ver_id: RegionVerId, task: Task) {
478-
self.0
479-
.entry(ver_id)
480-
.or_insert_with(|| Vec::with_capacity(1))
481-
.push(task)
482-
}
483-
}
484-
485-
impl<Task: GroupingTask> IntoIterator for GroupedTasks<Task> {
486-
type Item = (RegionVerId, Vec<Task>);
487-
type IntoIter = hash_map::IntoIter<RegionVerId, Vec<Task>>;
488-
489-
fn into_iter(self) -> hash_map::IntoIter<RegionVerId, Vec<Task>> {
490-
self.0.into_iter()
491-
}
492-
}
493-
494438
impl GroupingTask for Key {
495439
fn key(&self) -> &Key {
496440
self

0 commit comments

Comments
 (0)