Skip to content

Commit 90f715f

Browse files
authored
Merge pull request #65 from nrc/futures-imports
Reorganise futures imports
2 parents 98108bf + 18f538c commit 90f715f

File tree

9 files changed

+59
-72
lines changed

9 files changed

+59
-72
lines changed

examples/transaction.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,7 @@
55
mod common;
66

77
use crate::common::parse_args;
8-
use futures::{
9-
future,
10-
prelude::{StreamExt, TryStreamExt},
11-
stream, TryFutureExt,
12-
};
8+
use futures::prelude::*;
139
use std::ops::RangeBounds;
1410
use tikv_client::{
1511
transaction::{Client, IsolationLevel},

src/compat.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,7 @@
33
//! This module contains utility types and functions for making the transition
44
//! from futures 0.1 to 1.0 easier.
55
6-
use futures::prelude::{Future, Sink, Stream, StreamExt};
7-
use futures::stream::Fuse;
6+
use futures::prelude::*;
87
use futures::task::{Context, Poll};
98
use futures::try_ready;
109
use std::pin::Pin;
@@ -85,7 +84,7 @@ where
8584
St: Stream + Unpin,
8685
{
8786
sink: Option<Si>,
88-
stream: Option<Fuse<St>>,
87+
stream: Option<stream::Fuse<St>>,
8988
buffered: Option<St::Item>,
9089
}
9190

@@ -118,7 +117,7 @@ where
118117
)
119118
}
120119

121-
fn stream_mut(&mut self) -> Pin<&mut Fuse<St>> {
120+
fn stream_mut(&mut self) -> Pin<&mut stream::Fuse<St>> {
122121
Pin::new(
123122
self.stream
124123
.as_mut()

src/raw.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@
1010
//! **Warning:** It is not advisable to use both raw and transactional functionality in the same keyspace.
1111
//!
1212
use crate::{rpc::RpcClient, Config, Error, Key, KeyRange, KvPair, Result, Value};
13-
use futures::{future, task::Context, Future, Poll};
13+
use futures::prelude::*;
14+
use futures::task::{Context, Poll};
1415
use std::{fmt, ops::Bound, pin::Pin, sync::Arc, u32};
1516

1617
const MAX_RAW_KV_SCAN_LIMIT: u32 = 10240;

src/rpc/client.rs

Lines changed: 27 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,7 @@ use std::{
1010
time::Duration,
1111
};
1212

13-
use futures::future::{self, ready, Either, Future};
14-
use futures::prelude::TryFutureExt;
13+
use futures::prelude::*;
1514
use grpcio::{EnvBuilder, Environment};
1615
use kvproto::kvrpcpb;
1716
use log::*;
@@ -153,33 +152,31 @@ impl RpcClient {
153152
let inner = self.inner();
154153
loop_fn((0, tasks, result), move |(mut index, tasks, mut result)| {
155154
if index == tasks.len() {
156-
Either::Left(future::ok(Loop::Break(result)))
155+
future::Either::Left(future::ok(Loop::Break(result)))
157156
} else {
158157
let inner = Arc::clone(&inner);
159-
Either::Right(
160-
inner
161-
.locate_key(tasks[index].key())
162-
.map_ok(move |location| {
163-
while let Some(item) = tasks.get(index) {
164-
if !location.contains(item.key()) {
165-
break;
166-
}
167-
let ver_id = location.ver_id();
168-
let item = item.clone();
169-
if let Some(ref mut grouped) = result {
170-
grouped.add(ver_id, item);
171-
} else {
172-
result = Some(GroupedTasks::new(ver_id, item));
173-
}
174-
index += 1;
158+
future::Either::Right(inner.locate_key(tasks[index].key()).map_ok(
159+
move |location| {
160+
while let Some(item) = tasks.get(index) {
161+
if !location.contains(item.key()) {
162+
break;
175163
}
176-
if index == tasks.len() {
177-
Loop::Break(result)
164+
let ver_id = location.ver_id();
165+
let item = item.clone();
166+
if let Some(ref mut grouped) = result {
167+
grouped.add(ver_id, item);
178168
} else {
179-
Loop::Continue((index, tasks, result))
169+
result = Some(GroupedTasks::new(ver_id, item));
180170
}
181-
}),
182-
)
171+
index += 1;
172+
}
173+
if index == tasks.len() {
174+
Loop::Break(result)
175+
} else {
176+
Loop::Continue((index, tasks, result))
177+
}
178+
},
179+
))
183180
}
184181
})
185182
.map_ok(|r| r.unwrap_or_default())
@@ -200,7 +197,7 @@ impl RpcClient {
200197
store,
201198
})
202199
})
203-
.and_then(move |region| ready(inner2.kv_client(region)))
200+
.and_then(move |region| future::ready(inner2.kv_client(region)))
204201
}
205202

206203
fn region_context_by_id(
@@ -217,7 +214,7 @@ impl RpcClient {
217214
.load_store(store_id)
218215
.map_ok(|store| RegionContext { region, store })
219216
})
220-
.and_then(move |region| ready(inner2.kv_client(region)))
217+
.and_then(move |region| future::ready(inner2.kv_client(region)))
221218
}
222219

223220
fn raw(
@@ -279,9 +276,9 @@ impl RpcClient {
279276
cf: Option<ColumnFamily>,
280277
) -> impl Future<Output = Result<()>> {
281278
if value.is_empty() {
282-
Either::Left(future::err(Error::empty_value()))
279+
future::Either::Left(future::err(Error::empty_value()))
283280
} else {
284-
Either::Right(
281+
future::Either::Right(
285282
Self::raw(self.inner(), &key, cf)
286283
.and_then(|context| context.client().raw_put(context, key, value)),
287284
)
@@ -294,10 +291,10 @@ impl RpcClient {
294291
cf: Option<ColumnFamily>,
295292
) -> impl Future<Output = Result<()>> {
296293
if pairs.iter().any(|p| p.value().is_empty()) {
297-
Either::Left(future::err(Error::empty_value()))
294+
future::Either::Left(future::err(Error::empty_value()))
298295
} else {
299296
let inner = self.inner();
300-
Either::Right(
297+
future::Either::Right(
301298
self.group_tasks_by_region(pairs)
302299
.and_then(move |task_groups| {
303300
let mut tasks = Vec::with_capacity(task_groups.len());

src/rpc/pd/client.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,7 @@ use std::{
77
};
88

99
use futures::compat::Compat01As03;
10-
use futures::future::{ready, Future};
11-
use futures::prelude::{FutureExt, TryFutureExt};
10+
use futures::prelude::*;
1211
use grpcio::{CallOption, Environment};
1312
use kvproto::{metapb, pdpb, pdpb::PdClient as RpcClient};
1413

@@ -93,14 +92,14 @@ impl PdClient {
9392
let region = if resp.has_region() {
9493
resp.take_region()
9594
} else {
96-
return ready(Err(Error::region_for_key_not_found(key)));
95+
return future::ready(Err(Error::region_for_key_not_found(key)));
9796
};
9897
let leader = if resp.has_leader() {
9998
Some(resp.take_leader())
10099
} else {
101100
None
102101
};
103-
ready(Ok((region, leader)))
102+
future::ready(Ok((region, leader)))
104103
})
105104
}
106105

@@ -122,14 +121,14 @@ impl PdClient {
122121
let region = if resp.has_region() {
123122
resp.take_region()
124123
} else {
125-
return ready(Err(Error::region_not_found(region_id, None)));
124+
return future::ready(Err(Error::region_not_found(region_id, None)));
126125
};
127126
let leader = if resp.has_leader() {
128127
Some(resp.take_leader())
129128
} else {
130129
None
131130
};
132-
ready(Ok((region, leader)))
131+
future::ready(Ok((region, leader)))
133132
})
134133
}
135134

src/rpc/pd/leader.rs

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,21 +7,16 @@ use std::{
77
time::{Duration, Instant},
88
};
99

10-
use futures::prelude::{FutureExt, SinkExt, StreamExt, TryFutureExt};
11-
use futures::{
12-
channel::{
13-
mpsc::{channel, unbounded, Receiver, Sender, UnboundedReceiver, UnboundedSender},
14-
oneshot,
15-
},
16-
compat::{Compat01As03, Compat01As03Sink},
17-
future::ready,
18-
stream::TryStreamExt,
19-
Future,
10+
use futures::channel::{
11+
mpsc::{channel, unbounded, Receiver, Sender, UnboundedReceiver, UnboundedSender},
12+
oneshot,
2013
};
14+
use futures::compat::{Compat01As03, Compat01As03Sink};
15+
use futures::prelude::*;
2116
use grpcio::{CallOption, Environment, WriteFlags};
2217
use kvproto::pdpb;
2318
use log::*;
24-
use tokio_core::reactor::{Core, Handle as OtherHandle};
19+
use tokio_core::reactor::{Core, Handle as TokioHandle};
2520

2621
use crate::{
2722
compat::SinkCompat,
@@ -119,15 +114,15 @@ impl PdReactor {
119114
let mut core = Core::new().unwrap();
120115
let handle = core.handle();
121116
{
122-
let f = rx.take_while(|t| ready(t.is_some())).for_each(|t| {
117+
let f = rx.take_while(|t| future::ready(t.is_some())).for_each(|t| {
123118
Self::dispatch(&client, t.unwrap(), &handle);
124-
ready(())
119+
future::ready(())
125120
});
126121
core.run(TryFutureExt::compat(f.unit_error())).unwrap();
127122
}
128123
}
129124

130-
fn init(client: &Arc<RwLock<LeaderClient>>, handle: &OtherHandle) {
125+
fn init(client: &Arc<RwLock<LeaderClient>>, handle: &TokioHandle) {
131126
let client = Arc::clone(client);
132127
let (tx, rx) = client.write().unwrap().client.tso().unwrap();
133128
let tx = Compat01As03Sink::new(tx);
@@ -164,7 +159,7 @@ impl PdReactor {
164159
// Schedule another tso_batch of request
165160
reactor.schedule(PdTask::Request);
166161
}
167-
ready(Ok(()))
162+
future::ready(Ok(()))
168163
})
169164
.map_err(|e| panic!("unexpected error: {:?}", e))
170165
.compat(),
@@ -204,7 +199,7 @@ impl PdReactor {
204199
client.write().unwrap().reactor.tso_buffer = Some(requests);
205200
}
206201

207-
fn dispatch(client: &Arc<RwLock<LeaderClient>>, task: PdTask, handle: &OtherHandle) {
202+
fn dispatch(client: &Arc<RwLock<LeaderClient>>, task: PdTask, handle: &TokioHandle) {
208203
match task {
209204
PdTask::Request => Self::tso_request(client),
210205
PdTask::Response(requests, response) => Self::tso_response(client, requests, &response),

src/rpc/pd/request.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,7 @@ use std::{
77
};
88

99
use futures::compat::Compat01As03;
10-
use futures::future::{ok, ready, Either, Future, TryFutureExt};
11-
use futures::prelude::FutureExt;
10+
use futures::prelude::*;
1211
use log::*;
1312
use tokio_timer::timer::Handle;
1413

@@ -61,7 +60,7 @@ where
6160
debug!("reconnect remains: {}", self.reconnect_count);
6261

6362
if self.request_sent < MAX_REQUEST_COUNT {
64-
return Either::Left(ok(self));
63+
return future::Either::Left(future::ok(self));
6564
}
6665

6766
// Updating client.
@@ -71,9 +70,9 @@ where
7170
match (self.reconnect)(&self.client, RECONNECT_INTERVAL_SEC) {
7271
Ok(_) => {
7372
self.request_sent = 0;
74-
Either::Left(ok(self))
73+
future::Either::Left(future::ok(self))
7574
}
76-
Err(_) => Either::Right(
75+
Err(_) => future::Either::Right(
7776
Compat01As03::new(
7877
self.timer
7978
.delay(Instant::now() + Duration::from_secs(RECONNECT_INTERVAL_SEC)),
@@ -87,7 +86,7 @@ where
8786
self.request_sent += 1;
8887
debug!("request sent: {}", self.request_sent);
8988

90-
ok(self).and_then(|mut ctx| {
89+
future::ok(self).and_then(|mut ctx| {
9190
let req = (ctx.func)(&ctx.client);
9291
req.map(|resp| match resp {
9392
Ok(resp) => {
@@ -128,6 +127,6 @@ where
128127
.and_then(Self::send_and_receive)
129128
.map(Self::break_or_continue)
130129
})
131-
.and_then(|r| ready(r.post_loop()))
130+
.and_then(|r| future::ready(r.post_loop()))
132131
}
133132
}

src/rpc/tikv/client.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,7 @@
66
use std::{fmt, sync::Arc, time::Duration};
77

88
use futures::compat::Compat01As03;
9-
use futures::future::Future;
10-
use futures::prelude::{FutureExt, TryFutureExt};
9+
use futures::prelude::*;
1110
use grpcio::{CallOption, Environment};
1211
use kvproto::{errorpb, kvrpcpb, tikvpb::TikvClient};
1312

src/transaction.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@
1010
//!
1111
1212
use crate::{Config, Error, Key, KvPair, Value};
13-
use futures::{task::Context, Future, Poll, Stream};
13+
14+
use futures::prelude::*;
15+
use futures::task::{Context, Poll};
1416
use std::ops::RangeBounds;
1517
use std::pin::Pin;
1618

0 commit comments

Comments
 (0)