Skip to content

Commit 0a2d040

Browse files
taiki-ecramertj
authored andcommitted
Add AsyncBufRead trait
1 parent acb6872 commit 0a2d040

File tree

9 files changed

+297
-88
lines changed

9 files changed

+297
-88
lines changed

futures-io/src/lib.rs

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,63 @@ mod if_std {
266266
-> Poll<Result<u64>>;
267267
}
268268

269+
/// Read bytes asynchronously.
270+
///
271+
/// This trait is analogous to the `std::io::BufRead` trait, but integrates
272+
/// with the asynchronous task system. In particular, the `poll_fill_buf`
273+
/// method, unlike `BufRead::fill_buf`, will automatically queue the current task
274+
/// for wakeup and return if data is not yet available, rather than blocking
275+
/// the calling thread.
276+
pub trait AsyncBufRead: AsyncRead {
277+
/// Attempt to return the contents of the internal buffer, filling it with more data
278+
/// from the inner reader if it is empty.
279+
///
280+
/// On success, returns `Poll::Ready(Ok(buf))`.
281+
///
282+
/// If no data is available for reading, the method returns
283+
/// `Poll::Pending` and arranges for the current task (via
284+
/// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes
285+
/// readable or is closed.
286+
///
287+
/// This function is a lower-level call. It needs to be paired with the
288+
/// [`consume`] method to function properly. When calling this
289+
/// method, none of the contents will be "read" in the sense that later
290+
/// calling [`poll_read`] may return the same contents. As such, [`consume`] must
291+
/// be called with the number of bytes that are consumed from this buffer to
292+
/// ensure that the bytes are never returned twice.
293+
///
294+
/// [`poll_read`]: AsyncRead::poll_read
295+
/// [`consume`]: AsyncBufRead::consume
296+
///
297+
/// An empty buffer returned indicates that the stream has reached EOF.
298+
///
299+
/// # Implementation
300+
///
301+
/// This function may not return errors of kind `WouldBlock` or
302+
/// `Interrupted`. Implementations must convert `WouldBlock` into
303+
/// `Poll::Pending` and either internally retry or convert
304+
/// `Interrupted` into another error kind.
305+
fn poll_fill_buf<'a>(self: Pin<&'a mut Self>, cx: &mut Context<'_>)
306+
-> Poll<Result<&'a [u8]>>;
307+
308+
/// Tells this buffer that `amt` bytes have been consumed from the buffer,
309+
/// so they should no longer be returned in calls to [`poll_read`].
310+
///
311+
/// This function is a lower-level call. It needs to be paired with the
312+
/// [`poll_fill_buf`] method to function properly. This function does
313+
/// not perform any I/O, it simply informs this object that some amount of
314+
/// its buffer, returned from [`poll_fill_buf`], has been consumed and should
315+
/// no longer be returned. As such, this function may do odd things if
316+
/// [`poll_fill_buf`] isn't called before calling it.
317+
///
318+
/// The `amt` must be `<=` the number of bytes in the buffer returned by
319+
/// [`poll_fill_buf`].
320+
///
321+
/// [`poll_read`]: AsyncRead::poll_read
322+
/// [`poll_fill_buf`]: AsyncBufRead::poll_fill_buf
323+
fn consume(self: Pin<&mut Self>, amt: usize);
324+
}
325+
269326
macro_rules! deref_async_read {
270327
() => {
271328
unsafe fn initializer(&self) -> Initializer {
@@ -340,6 +397,10 @@ mod if_std {
340397
unsafe_delegate_async_read_to_stdio!();
341398
}
342399

400+
impl AsyncRead for StdIo::Empty {
401+
unsafe_delegate_async_read_to_stdio!();
402+
}
403+
343404
impl<T: AsRef<[u8]> + Unpin> AsyncRead for StdIo::Cursor<T> {
344405
unsafe_delegate_async_read_to_stdio!();
345406
}
@@ -499,6 +560,70 @@ mod if_std {
499560
impl<T: AsRef<[u8]> + Unpin> AsyncSeek for StdIo::Cursor<T> {
500561
delegate_async_seek_to_stdio!();
501562
}
563+
564+
macro_rules! deref_async_buf_read {
565+
() => {
566+
fn poll_fill_buf<'a>(self: Pin<&'a mut Self>, cx: &mut Context<'_>)
567+
-> Poll<Result<&'a [u8]>>
568+
{
569+
Pin::new(&mut **self.get_mut()).poll_fill_buf(cx)
570+
}
571+
572+
fn consume(self: Pin<&mut Self>, amt: usize) {
573+
Pin::new(&mut **self.get_mut()).consume(amt)
574+
}
575+
}
576+
}
577+
578+
impl<T: ?Sized + AsyncBufRead + Unpin> AsyncBufRead for Box<T> {
579+
deref_async_buf_read!();
580+
}
581+
582+
impl<T: ?Sized + AsyncBufRead + Unpin> AsyncBufRead for &mut T {
583+
deref_async_buf_read!();
584+
}
585+
586+
impl<P> AsyncBufRead for Pin<P>
587+
where
588+
P: DerefMut + Unpin,
589+
P::Target: AsyncBufRead,
590+
{
591+
fn poll_fill_buf<'a>(self: Pin<&'a mut Self>, cx: &mut Context<'_>)
592+
-> Poll<Result<&'a [u8]>>
593+
{
594+
self.get_mut().as_mut().poll_fill_buf(cx)
595+
}
596+
597+
fn consume(self: Pin<&mut Self>, amt: usize) {
598+
self.get_mut().as_mut().consume(amt)
599+
}
600+
}
601+
602+
macro_rules! delegate_async_buf_read_to_stdio {
603+
() => {
604+
fn poll_fill_buf<'a>(self: Pin<&'a mut Self>, _: &mut Context<'_>)
605+
-> Poll<Result<&'a [u8]>>
606+
{
607+
Poll::Ready(StdIo::BufRead::fill_buf(self.get_mut()))
608+
}
609+
610+
fn consume(self: Pin<&mut Self>, amt: usize) {
611+
StdIo::BufRead::consume(self.get_mut(), amt)
612+
}
613+
}
614+
}
615+
616+
impl AsyncBufRead for &[u8] {
617+
delegate_async_buf_read_to_stdio!();
618+
}
619+
620+
impl AsyncBufRead for StdIo::Empty {
621+
delegate_async_buf_read_to_stdio!();
622+
}
623+
624+
impl<T: AsRef<[u8]> + Unpin> AsyncBufRead for StdIo::Cursor<T> {
625+
delegate_async_buf_read_to_stdio!();
626+
}
502627
}
503628

504629
#[cfg(feature = "std")]

futures-util/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ Common utilities and extension traits for the futures-rs library.
1515
name = "futures_util"
1616

1717
[features]
18-
std = ["alloc", "futures-core-preview/std", "futures-io-preview/std", "futures-sink-preview/std", "futures-select-macro-preview/std", "rand", "rand_core", "slab"]
18+
std = ["alloc", "futures-core-preview/std", "futures-io-preview/std", "futures-sink-preview/std", "futures-select-macro-preview/std", "rand", "rand_core", "slab", "memchr"]
1919
default = ["std"]
2020
compat = ["std", "futures_01"]
2121
io-compat = ["compat", "tokio-io"]
@@ -36,6 +36,7 @@ proc-macro-nested = "0.1.2"
3636
rand = { version = "0.6.4", optional = true }
3737
rand_core = { version = ">=0.2.2, <0.4", optional = true } # See https://github.com/rust-random/rand/issues/645
3838
slab = { version = "0.4", optional = true }
39+
memchr = { version = "2.2", optional = true }
3940
futures_01 = { version = "0.1.25", optional = true, package = "futures" }
4041
tokio-io = { version = "0.1.9", optional = true }
4142
pin-utils = "0.1.0-alpha.4"

futures-util/src/io/allow_std.rs

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
use futures_core::task::{Context, Poll};
2-
use futures_io::{AsyncRead, AsyncWrite, AsyncSeek};
2+
use futures_io::{AsyncRead, AsyncWrite, AsyncSeek, AsyncBufRead};
33
use std::{fmt, io};
44
use std::pin::Pin;
5-
use std::string::String;
6-
use std::vec::Vec;
75

86
/// A simple wrapper type which allows types which implement only
97
/// implement `std::io::Read` or `std::io::Write`
@@ -130,3 +128,25 @@ impl<T> AsyncSeek for AllowStdIo<T> where T: io::Seek {
130128
Poll::Ready(Ok(try_with_interrupt!(self.0.seek(pos))))
131129
}
132130
}
131+
132+
impl<T> io::BufRead for AllowStdIo<T> where T: io::BufRead {
133+
fn fill_buf(&mut self) -> io::Result<&[u8]> {
134+
self.0.fill_buf()
135+
}
136+
fn consume(&mut self, amt: usize) {
137+
self.0.consume(amt)
138+
}
139+
}
140+
141+
impl<T> AsyncBufRead for AllowStdIo<T> where T: io::BufRead {
142+
fn poll_fill_buf<'a>(mut self: Pin<&'a mut Self>, _: &mut Context<'_>)
143+
-> Poll<io::Result<&'a [u8]>>
144+
{
145+
let this: *mut Self = &mut *self as *mut _;
146+
Poll::Ready(Ok(try_with_interrupt!(unsafe { &mut *this }.0.fill_buf())))
147+
}
148+
149+
fn consume(mut self: Pin<&mut Self>, amt: usize) {
150+
self.0.consume(amt)
151+
}
152+
}

futures-util/src/io/disabled/read_until.rs

Lines changed: 0 additions & 70 deletions
This file was deleted.

futures-util/src/io/mod.rs

Lines changed: 72 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,12 @@
55
//! `AsyncReadExt` and `AsyncWriteExt` traits which add methods
66
//! to the `AsyncRead` and `AsyncWrite` types.
77
8-
use std::vec::Vec;
9-
10-
pub use futures_io::{AsyncRead, AsyncWrite, AsyncSeek, IoVec, SeekFrom};
8+
pub use futures_io::{
9+
AsyncRead, AsyncWrite, AsyncSeek, AsyncBufRead, IoVec, SeekFrom,
10+
};
1111

1212
#[cfg(feature = "io-compat")] use crate::compat::Compat;
1313

14-
// Temporarily removed until AsyncBufRead is implemented
15-
// pub use io::lines::{lines, Lines};
16-
// pub use io::read_until::{read_until, ReadUntil};
17-
// mod lines;
18-
// mod read_until;
19-
2014
mod allow_std;
2115
pub use self::allow_std::AllowStdIo;
2216

@@ -26,15 +20,26 @@ pub use self::copy_into::CopyInto;
2620
mod flush;
2721
pub use self::flush::Flush;
2822

23+
// TODO
24+
// mod lines;
25+
// pub use self::lines::Lines;
26+
2927
mod read;
3028
pub use self::read::Read;
3129

3230
mod read_exact;
3331
pub use self::read_exact::ReadExact;
3432

33+
// TODO
34+
// mod read_line;
35+
// pub use self::read_line::ReadLine;
36+
3537
mod read_to_end;
3638
pub use self::read_to_end::ReadToEnd;
3739

40+
mod read_until;
41+
pub use self::read_until::ReadUntil;
42+
3843
mod close;
3944
pub use self::close::Close;
4045

@@ -343,3 +348,61 @@ pub trait AsyncSeekExt: AsyncSeek {
343348
}
344349

345350
impl<S: AsyncSeek + ?Sized> AsyncSeekExt for S {}
351+
352+
/// An extension trait which adds utility methods to `AsyncBufRead` types.
353+
pub trait AsyncBufReadExt: AsyncBufRead {
354+
/// Creates a future which will read all the bytes associated with this I/O
355+
/// object into `buf` until the delimiter `byte` or EOF is reached.
356+
/// This method is the async equivalent to [`BufRead::read_until`](std::io::BufRead::read_until).
357+
///
358+
/// This function will read bytes from the underlying stream until the
359+
/// delimiter or EOF is found. Once found, all bytes up to, and including,
360+
/// the delimiter (if found) will be appended to `buf`.
361+
///
362+
/// The returned future will resolve to the number of bytes read once the read
363+
/// operation is completed.
364+
///
365+
/// In the case of an error the buffer and the object will be discarded, with
366+
/// the error yielded.
367+
///
368+
/// # Examples
369+
///
370+
/// ```
371+
/// #![feature(async_await, await_macro)]
372+
/// # futures::executor::block_on(async {
373+
/// use futures::io::AsyncBufReadExt;
374+
/// use std::io::Cursor;
375+
///
376+
/// let mut cursor = Cursor::new(b"lorem-ipsum");
377+
/// let mut buf = vec![];
378+
///
379+
/// // cursor is at 'l'
380+
/// let num_bytes = await!(cursor.read_until(b'-', &mut buf))?;
381+
/// assert_eq!(num_bytes, 6);
382+
/// assert_eq!(buf, b"lorem-");
383+
/// buf.clear();
384+
///
385+
/// // cursor is at 'i'
386+
/// let num_bytes = await!(cursor.read_until(b'-', &mut buf))?;
387+
/// assert_eq!(num_bytes, 5);
388+
/// assert_eq!(buf, b"ipsum");
389+
/// buf.clear();
390+
///
391+
/// // cursor is at EOF
392+
/// let num_bytes = await!(cursor.read_until(b'-', &mut buf))?;
393+
/// assert_eq!(num_bytes, 0);
394+
/// assert_eq!(buf, b"");
395+
/// # Ok::<(), Box<std::error::Error>>(()) }).unwrap();
396+
/// ```
397+
fn read_until<'a>(
398+
&'a mut self,
399+
byte: u8,
400+
buf: &'a mut Vec<u8>,
401+
) -> ReadUntil<'a, Self>
402+
where Self: Unpin,
403+
{
404+
ReadUntil::new(self, byte, buf)
405+
}
406+
}
407+
408+
impl<R: AsyncBufRead + ?Sized> AsyncBufReadExt for R {}

0 commit comments

Comments
 (0)