Skip to content

Commit 164c00f

Browse files
authored
Merge pull request #5 from stepfunc/feature/tokio-1.0
Feature/tokio 1.0
2 parents 0784e13 + ca23629 commit 164c00f

File tree

7 files changed

+55
-66
lines changed

7 files changed

+55
-66
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,4 @@ authors = ["Émile Grégoire <emile@stepfunc.io>"]
55
edition = "2018"
66

77
[dependencies]
8-
tokio = { version = "^0.2.23", features = ["tcp", "dns", "sync", "io-util", "io-std", "time", "rt-core", "rt-threaded", "macros"] }
8+
tokio = { version = "1", features = ["net", "sync", "io-util", "io-std", "time", "rt", "rt-multi-thread", "macros"] }

src/mock/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ mod tests {
5353
fn select_task(mut rx: sync::mpsc::Receiver<u32>) -> Spawn<impl Future<Output = Option<u32>>> {
5454
crate::mock::test::spawn(async move {
5555
tokio::select! {
56-
_ = time::delay_for(Duration::from_secs(1)) => {
56+
_ = time::sleep(Duration::from_secs(1)) => {
5757
None
5858
}
5959
value = rx.recv() => {

src/mock/sync/mpsc.rs

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::pin::Pin;
55
use std::sync::{Arc, Mutex};
66
use std::task::{Context, Poll};
77

8-
use tokio::sync::mpsc::error::{SendError, TryRecvError, TrySendError};
8+
use tokio::sync::mpsc::error::{SendError, TrySendError};
99

1010
pub use tokio::sync::mpsc::error;
1111

@@ -27,13 +27,13 @@ impl<T> ChannelData<T> {
2727
}
2828
}
2929

30-
fn try_recv(&mut self) -> Result<T, TryRecvError> {
30+
fn poll_recv(&mut self) -> Poll<Option<T>> {
3131
if let Some(msg) = self.queue.pop_front() {
32-
Ok(msg)
32+
Poll::Ready(Some(msg))
3333
} else if self.num_senders == 0 {
34-
Err(TryRecvError::Closed)
34+
Poll::Ready(None)
3535
} else {
36-
Err(TryRecvError::Empty)
36+
Poll::Pending
3737
}
3838
}
3939

@@ -62,11 +62,7 @@ impl<T> Future for ReceiveFuture<T> {
6262
type Output = Option<T>;
6363

6464
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
65-
match self.data.lock().unwrap().try_recv() {
66-
Ok(msg) => Poll::Ready(Some(msg)),
67-
Err(TryRecvError::Closed) => Poll::Ready(None),
68-
Err(TryRecvError::Empty) => Poll::Pending,
69-
}
65+
self.data.lock().unwrap().poll_recv()
7066
}
7167
}
7268

@@ -110,8 +106,8 @@ impl<T> Receiver<T> {
110106
}
111107
}
112108

113-
pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
114-
self.data.lock().unwrap().try_recv()
109+
pub fn poll_recv(&mut self) -> Poll<Option<T>> {
110+
self.data.lock().unwrap().poll_recv()
115111
}
116112

117113
pub fn close(&mut self) {
@@ -279,14 +275,14 @@ mod tests {
279275
fn dropping_tx_try_recv() {
280276
let (mut tx, mut rx) = channel(16);
281277

282-
assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
278+
assert_eq!(rx.poll_recv(), Poll::Pending);
283279
assert_ready!(spawn(async move {
284280
tx.send(()).await.unwrap();
285281
drop(tx);
286282
})
287283
.poll());
288-
assert_eq!(rx.try_recv(), Ok(()));
289-
assert_eq!(rx.try_recv(), Err(TryRecvError::Closed));
284+
assert_eq!(rx.poll_recv(), Poll::Ready(Some(())));
285+
assert_eq!(rx.poll_recv(), Poll::Ready(None));
290286
}
291287

292288
#[test]
@@ -346,7 +342,7 @@ mod tests {
346342
assert!(tx.try_send(()).is_ok());
347343
}
348344
assert!(matches!(tx.try_send(()), Err(TrySendError::Full(()))));
349-
assert!(rx.try_recv().is_ok());
345+
assert_eq!(rx.poll_recv(), Poll::Ready(Some(())));
350346
assert!(tx.try_send(()).is_ok());
351347
}
352348
}
@@ -370,11 +366,11 @@ mod tests {
370366
fn dropping_tx_try_recv() {
371367
let (mut tx, mut rx) = unbounded_channel();
372368

373-
assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
369+
assert_eq!(rx.poll_recv(), Poll::Pending);
374370
tx.send(()).unwrap();
375371
drop(tx);
376-
assert_eq!(rx.try_recv(), Ok(()));
377-
assert_eq!(rx.try_recv(), Err(TryRecvError::Closed));
372+
assert_eq!(rx.poll_recv(), Poll::Ready(Some(())));
373+
assert_eq!(rx.poll_recv(), Poll::Ready(None));
378374
}
379375

380376
#[test]

src/mock/test/io.rs

Lines changed: 31 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@ use std::collections::VecDeque;
33
use std::pin::Pin;
44
use std::sync::{Arc, Mutex};
55
use std::task::{Context, Poll};
6+
use tokio::io::ReadBuf;
67

8+
#[derive(Debug)]
79
enum ChannelState {
8-
Open(VecDeque<u8>),
10+
Open(VecDeque<Box<[u8]>>),
911
Closed,
1012
Error(ErrorKind),
1113
}
@@ -17,9 +19,12 @@ impl ChannelState {
1719

1820
fn push(&mut self, data: &[u8]) {
1921
match self {
20-
Self::Open(buffer) => buffer.extend(data.iter()),
22+
Self::Open(buffer) => {
23+
buffer.push_back(Vec::from(data).into_boxed_slice());
24+
}
2125
_ => {
22-
let buffer = data.iter().copied().collect();
26+
let mut buffer = VecDeque::new();
27+
buffer.push_back(Vec::from(data).into_boxed_slice());
2328
*self = Self::Open(buffer);
2429
}
2530
}
@@ -41,6 +46,7 @@ impl ChannelState {
4146
}
4247
}
4348

49+
#[derive(Debug)]
4450
struct Shared {
4551
write_pending: bool,
4652
write_channel: ChannelState,
@@ -110,33 +116,31 @@ impl Drop for Handle {
110116
}
111117
}
112118

119+
#[derive(Debug)]
113120
pub struct MockIO(Arc<Mutex<Shared>>);
114121

115122
impl AsyncRead for MockIO {
116123
fn poll_read(
117124
self: Pin<&mut Self>,
118125
_cx: &mut Context<'_>,
119-
buf: &mut [u8],
120-
) -> Poll<std::io::Result<usize>> {
126+
buf: &mut ReadBuf<'_>,
127+
) -> Poll<std::io::Result<()>> {
121128
let mut shared = self.0.lock().unwrap();
122129

123130
match &mut shared.read_channel {
124-
ChannelState::Open(data) => match data.len() {
125-
0 => Poll::Pending,
126-
_ => {
127-
let mut num_bytes = 0;
128-
for dest in buf.iter_mut() {
129-
if let Some(byte) = data.pop_front() {
130-
*dest = byte;
131-
num_bytes += 1;
132-
} else {
133-
return Poll::Ready(Ok(num_bytes));
131+
ChannelState::Open(data) => {
132+
match data.pop_front() {
133+
None => Poll::Pending,
134+
Some(bytes) => {
135+
if bytes.len() > buf.remaining() {
136+
panic!("insufficient write space (available == {}) for queued read (len = {})", buf.remaining(), bytes.len());
134137
}
138+
buf.put_slice(&bytes);
139+
Poll::Ready(Ok(()))
135140
}
136-
Poll::Ready(Ok(num_bytes))
137141
}
138-
},
139-
ChannelState::Closed => Poll::Ready(Ok(0)),
142+
}
143+
ChannelState::Closed => Poll::Ready(Ok(())),
140144
ChannelState::Error(err) => Poll::Ready(Err(Error::new(*err, "test error"))),
141145
}
142146
}
@@ -151,33 +155,22 @@ impl AsyncWrite for MockIO {
151155
let mut shared = self.0.lock().unwrap();
152156

153157
match &mut shared.write_channel {
154-
ChannelState::Open(data) => match data.len() {
155-
0 => {
158+
ChannelState::Open(data) => match data.pop_front() {
159+
None => {
156160
shared.write_pending = true;
157161
Poll::Pending
158162
}
159-
_ => {
160-
let copy = data.clone();
161-
162-
let mut num_bytes = 0;
163-
for dest in buf.iter() {
164-
if let Some(byte) = data.pop_front() {
165-
if *dest != byte {
166-
panic!(
167-
r#"unexpected write:
163+
Some(expected) => {
164+
if buf != expected.as_ref() {
165+
panic!(
166+
r#"unexpected write:
168167
expected: {:02X?},
169168
received: {:02X?}"#,
170-
copy, buf
171-
);
172-
}
173-
num_bytes += 1;
174-
} else {
175-
shared.write_pending = true;
176-
return Poll::Ready(Ok(num_bytes));
177-
}
169+
expected, buf
170+
);
178171
}
179172
shared.write_pending = false;
180-
Poll::Ready(Ok(num_bytes))
173+
Poll::Ready(Ok(expected.len()))
181174
}
182175
},
183176
ChannelState::Closed => Poll::Ready(Ok(0)),

src/mock/time/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
11
mod clock;
2-
mod delay;
32
mod instant;
3+
mod sleep;
44

55
pub use std::time::Duration;
66

7-
pub use delay::Delay;
87
pub use instant::Instant;
8+
pub use sleep::Delay;
99

10-
pub fn delay_until(deadline: Instant) -> Delay {
10+
pub fn sleep_until(deadline: Instant) -> Delay {
1111
Delay::new_deadline(deadline)
1212
}
1313

14-
pub fn delay_for(delay: Duration) -> Delay {
14+
pub fn sleep(delay: Duration) -> Delay {
1515
Delay::new_delay(delay)
1616
}
1717

File renamed without changes.

src/real.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ pub mod time {
2222

2323
pub use std::time::Duration; // Re-export in tokio
2424

25-
pub use time::delay_for;
26-
pub use time::delay_until;
25+
pub use time::sleep;
26+
pub use time::sleep_until;
2727
pub use time::Instant;
2828
}
2929

@@ -42,8 +42,8 @@ pub mod sync {
4242
pub mod error {
4343
use tokio::sync::mpsc::error;
4444

45+
pub use error::RecvError;
4546
pub use error::SendError;
46-
pub use error::TryRecvError;
4747
pub use error::TrySendError;
4848
}
4949
}

0 commit comments

Comments
 (0)