Skip to content

Commit 20fc9a8

Browse files
Thomasdezeeuwcramertj
authored andcommitted
Merge InterleavePending and InterleavePendingWrite
1 parent eccfa19 commit 20fc9a8

File tree

6 files changed

+167
-197
lines changed

6 files changed

+167
-197
lines changed
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
use futures_io::{self as io, AsyncBufRead, AsyncRead, AsyncWrite};
2+
use pin_utils::{unsafe_pinned, unsafe_unpinned};
3+
use std::{
4+
pin::Pin,
5+
task::{Context, Poll},
6+
};
7+
8+
/// I/O wrapper for interleaving [`Poll::Pending`] in calls to read or write.
9+
///
10+
/// See the [`interleave_pending`] and [`interleave_pending_write`] methods.
11+
///
12+
/// [`interleave_pending`]: super::AsyncReadTestExt::interleave_pending
13+
/// [`interleave_pending_write`]: super::AsyncWriteTestExt::interleave_pending_write
14+
#[derive(Debug)]
15+
pub struct InterleavePending<IO> {
16+
io: IO,
17+
pended: bool,
18+
}
19+
20+
impl<IO: Unpin> Unpin for InterleavePending<IO> {}
21+
22+
impl<IO> InterleavePending<IO> {
23+
unsafe_pinned!(io: IO);
24+
unsafe_unpinned!(pended: bool);
25+
26+
pub(crate) fn new(io: IO) -> Self {
27+
Self { io, pended: false }
28+
}
29+
30+
/// Acquires a reference to the underlying I/O object that this adaptor is
31+
/// wrapping.
32+
pub fn get_ref(&self) -> &IO {
33+
&self.io
34+
}
35+
36+
/// Acquires a mutable reference to the underlying I/O object that this
37+
/// adaptor is wrapping.
38+
pub fn get_mut(&mut self) -> &mut IO {
39+
&mut self.io
40+
}
41+
42+
/// Acquires a pinned mutable reference to the underlying I/O object that
43+
/// this adaptor is wrapping.
44+
pub fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut IO> {
45+
self.project().0
46+
}
47+
48+
/// Consumes this adaptor returning the underlying I/O object.
49+
pub fn into_inner(self) -> IO {
50+
self.io
51+
}
52+
53+
fn project<'a>(self: Pin<&'a mut Self>) -> (Pin<&'a mut IO>, &'a mut bool) {
54+
unsafe {
55+
let this = self.get_unchecked_mut();
56+
(Pin::new_unchecked(&mut this.io), &mut this.pended)
57+
}
58+
}
59+
}
60+
61+
impl<W: AsyncWrite> AsyncWrite for InterleavePending<W> {
62+
fn poll_write(
63+
self: Pin<&mut Self>,
64+
cx: &mut Context<'_>,
65+
buf: &[u8],
66+
) -> Poll<io::Result<usize>> {
67+
let (writer, pended) = self.project();
68+
if *pended {
69+
let next = writer.poll_write(cx, buf);
70+
if next.is_ready() {
71+
*pended = false;
72+
}
73+
next
74+
} else {
75+
cx.waker().wake_by_ref();
76+
*pended = true;
77+
Poll::Pending
78+
}
79+
}
80+
81+
fn poll_flush(
82+
self: Pin<&mut Self>,
83+
cx: &mut Context<'_>,
84+
) -> Poll<io::Result<()>> {
85+
let (writer, pended) = self.project();
86+
if *pended {
87+
let next = writer.poll_flush(cx);
88+
if next.is_ready() {
89+
*pended = false;
90+
}
91+
next
92+
} else {
93+
cx.waker().wake_by_ref();
94+
*pended = true;
95+
Poll::Pending
96+
}
97+
}
98+
99+
fn poll_close(
100+
self: Pin<&mut Self>,
101+
cx: &mut Context<'_>,
102+
) -> Poll<io::Result<()>> {
103+
let (writer, pended) = self.project();
104+
if *pended {
105+
let next = writer.poll_close(cx);
106+
if next.is_ready() {
107+
*pended = false;
108+
}
109+
next
110+
} else {
111+
cx.waker().wake_by_ref();
112+
*pended = true;
113+
Poll::Pending
114+
}
115+
}
116+
}
117+
118+
impl<R: AsyncRead> AsyncRead for InterleavePending<R> {
119+
fn poll_read(
120+
self: Pin<&mut Self>,
121+
cx: &mut Context<'_>,
122+
buf: &mut [u8],
123+
) -> Poll<io::Result<usize>> {
124+
let (reader, pended) = self.project();
125+
if *pended {
126+
let next = reader.poll_read(cx, buf);
127+
if next.is_ready() {
128+
*pended = false;
129+
}
130+
next
131+
} else {
132+
cx.waker().wake_by_ref();
133+
*pended = true;
134+
Poll::Pending
135+
}
136+
}
137+
}
138+
139+
impl<R: AsyncBufRead> AsyncBufRead for InterleavePending<R> {
140+
fn poll_fill_buf<'a>(
141+
self: Pin<&'a mut Self>,
142+
cx: &mut Context<'_>,
143+
) -> Poll<io::Result<&'a [u8]>> {
144+
let (reader, pended) = self.project();
145+
if *pended {
146+
let next = reader.poll_fill_buf(cx);
147+
if next.is_ready() {
148+
*pended = false;
149+
}
150+
next
151+
} else {
152+
cx.waker().wake_by_ref();
153+
*pended = true;
154+
Poll::Pending
155+
}
156+
}
157+
158+
fn consume(self: Pin<&mut Self>, amount: usize) {
159+
self.io().consume(amount)
160+
}
161+
}

futures-test/src/io/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
//! Additional combinators for testing async IO.
22
3+
mod interleave_pending;
4+
35
pub mod read;
46
pub use read::AsyncReadTestExt;
57

futures-test/src/io/read/interleave_pending.rs

Lines changed: 0 additions & 79 deletions
This file was deleted.

futures-test/src/io/read/mod.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,7 @@
22
33
use futures_io::AsyncRead;
44

5-
mod interleave_pending;
6-
pub use self::interleave_pending::InterleavePending;
5+
pub use super::interleave_pending::InterleavePending;
76

87
/// Additional combinators for testing async readers.
98
pub trait AsyncReadTestExt: AsyncRead {

futures-test/src/io/write/interleave_pending_write.rs

Lines changed: 0 additions & 112 deletions
This file was deleted.

futures-test/src/io/write/mod.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,7 @@
22
33
use futures_io::AsyncWrite;
44

5-
mod interleave_pending_write;
6-
pub use self::interleave_pending_write::InterleavePendingWrite;
5+
pub use super::interleave_pending::InterleavePending;
76

87
mod limited_write;
98
pub use self::limited_write::LimitedWrite;
@@ -45,11 +44,11 @@ pub trait AsyncWriteTestExt: AsyncWrite {
4544
///
4645
/// # Ok::<(), std::io::Error>(())
4746
/// ```
48-
fn interleave_pending_write(self) -> InterleavePendingWrite<Self>
47+
fn interleave_pending_write(self) -> InterleavePending<Self>
4948
where
5049
Self: Sized,
5150
{
52-
InterleavePendingWrite::new(self)
51+
InterleavePending::new(self)
5352
}
5453

5554
/// Limit the number of bytes allowed to be written on each call to `poll_write`.

0 commit comments

Comments
 (0)