Skip to content

Commit 37d15b7

Browse files
ibraheemdevtaiki-e
authored andcommitted
Abortable streams (#2410)
1 parent 5912e2e commit 37d15b7

File tree

8 files changed

+274
-155
lines changed

8 files changed

+274
-155
lines changed

futures-util/src/abortable.rs

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
use crate::task::AtomicWaker;
2+
use alloc::sync::Arc;
3+
use core::fmt;
4+
use core::pin::Pin;
5+
use core::sync::atomic::{AtomicBool, Ordering};
6+
use futures_core::future::Future;
7+
use futures_core::task::{Context, Poll};
8+
use futures_core::Stream;
9+
use pin_project_lite::pin_project;
10+
11+
pin_project! {
12+
/// A future/stream which can be remotely short-circuited using an `AbortHandle`.
13+
#[derive(Debug, Clone)]
14+
#[must_use = "futures/streams do nothing unless you poll them"]
15+
pub struct Abortable<T> {
16+
#[pin]
17+
task: T,
18+
inner: Arc<AbortInner>,
19+
}
20+
}
21+
22+
impl<T> Abortable<T> {
23+
/// Creates a new `Abortable` future/stream using an existing `AbortRegistration`.
24+
/// `AbortRegistration`s can be acquired through `AbortHandle::new`.
25+
///
26+
/// When `abort` is called on the handle tied to `reg` or if `abort` has
27+
/// already been called, the future/stream will complete immediately without making
28+
/// any further progress.
29+
///
30+
/// # Examples:
31+
///
32+
/// Usage with futures:
33+
///
34+
/// ```
35+
/// # futures::executor::block_on(async {
36+
/// use futures::future::{Abortable, AbortHandle, Aborted};
37+
///
38+
/// let (abort_handle, abort_registration) = AbortHandle::new_pair();
39+
/// let future = Abortable::new(async { 2 }, abort_registration);
40+
/// abort_handle.abort();
41+
/// assert_eq!(future.await, Err(Aborted));
42+
/// # });
43+
/// ```
44+
///
45+
/// Usage with streams:
46+
///
47+
/// ```
48+
/// # futures::executor::block_on(async {
49+
/// # use futures::future::{Abortable, AbortHandle};
50+
/// # use futures::stream::{self, StreamExt};
51+
///
52+
/// let (abort_handle, abort_registration) = AbortHandle::new_pair();
53+
/// let mut stream = Abortable::new(stream::iter(vec![1, 2, 3]), abort_registration);
54+
/// abort_handle.abort();
55+
/// assert_eq!(stream.next().await, None);
56+
/// # });
57+
/// ```
58+
pub fn new(task: T, reg: AbortRegistration) -> Self {
59+
Self { task, inner: reg.inner }
60+
}
61+
62+
/// Checks whether the task has been aborted. Note that all this
63+
/// method indicates is whether [`AbortHandle::abort`] was *called*.
64+
/// This means that it will return `true` even if:
65+
/// * `abort` was called after the task had completed.
66+
/// * `abort` was called while the task was being polled - the task may still be running and
67+
/// will not be stopped until `poll` returns.
68+
pub fn is_aborted(&self) -> bool {
69+
self.inner.aborted.load(Ordering::Relaxed)
70+
}
71+
}
72+
73+
/// A registration handle for an `Abortable` task.
74+
/// Values of this type can be acquired from `AbortHandle::new` and are used
75+
/// in calls to `Abortable::new`.
76+
#[derive(Debug)]
77+
pub struct AbortRegistration {
78+
inner: Arc<AbortInner>,
79+
}
80+
81+
/// A handle to an `Abortable` task.
82+
#[derive(Debug, Clone)]
83+
pub struct AbortHandle {
84+
inner: Arc<AbortInner>,
85+
}
86+
87+
impl AbortHandle {
88+
/// Creates an (`AbortHandle`, `AbortRegistration`) pair which can be used
89+
/// to abort a running future or stream.
90+
///
91+
/// This function is usually paired with a call to [`Abortable::new`].
92+
pub fn new_pair() -> (Self, AbortRegistration) {
93+
let inner =
94+
Arc::new(AbortInner { waker: AtomicWaker::new(), aborted: AtomicBool::new(false) });
95+
96+
(Self { inner: inner.clone() }, AbortRegistration { inner })
97+
}
98+
}
99+
100+
// Inner type storing the waker to awaken and a bool indicating that it
101+
// should be aborted.
102+
#[derive(Debug)]
103+
struct AbortInner {
104+
waker: AtomicWaker,
105+
aborted: AtomicBool,
106+
}
107+
108+
/// Indicator that the `Abortable` task was aborted.
109+
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
110+
pub struct Aborted;
111+
112+
impl fmt::Display for Aborted {
113+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
114+
write!(f, "`Abortable` future has been aborted")
115+
}
116+
}
117+
118+
#[cfg(feature = "std")]
119+
impl std::error::Error for Aborted {}
120+
121+
impl<T> Abortable<T> {
122+
fn try_poll<I>(
123+
mut self: Pin<&mut Self>,
124+
cx: &mut Context<'_>,
125+
poll: impl Fn(Pin<&mut T>, &mut Context<'_>) -> Poll<I>,
126+
) -> Poll<Result<I, Aborted>> {
127+
// Check if the task has been aborted
128+
if self.is_aborted() {
129+
return Poll::Ready(Err(Aborted));
130+
}
131+
132+
// attempt to complete the task
133+
if let Poll::Ready(x) = poll(self.as_mut().project().task, cx) {
134+
return Poll::Ready(Ok(x));
135+
}
136+
137+
// Register to receive a wakeup if the task is aborted in the future
138+
self.inner.waker.register(cx.waker());
139+
140+
// Check to see if the task was aborted between the first check and
141+
// registration.
142+
// Checking with `is_aborted` which uses `Relaxed` is sufficient because
143+
// `register` introduces an `AcqRel` barrier.
144+
if self.is_aborted() {
145+
return Poll::Ready(Err(Aborted));
146+
}
147+
148+
Poll::Pending
149+
}
150+
}
151+
152+
impl<Fut> Future for Abortable<Fut>
153+
where
154+
Fut: Future,
155+
{
156+
type Output = Result<Fut::Output, Aborted>;
157+
158+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
159+
self.try_poll(cx, |fut, cx| fut.poll(cx))
160+
}
161+
}
162+
163+
impl<St> Stream for Abortable<St>
164+
where
165+
St: Stream,
166+
{
167+
type Item = St::Item;
168+
169+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
170+
self.try_poll(cx, |stream, cx| stream.poll_next(cx)).map(Result::ok).map(Option::flatten)
171+
}
172+
}
173+
174+
impl AbortHandle {
175+
/// Abort the `Abortable` stream/future associated with this handle.
176+
///
177+
/// Notifies the Abortable task associated with this handle that it
178+
/// should abort. Note that if the task is currently being polled on
179+
/// another thread, it will not immediately stop running. Instead, it will
180+
/// continue to run until its poll method returns.
181+
pub fn abort(&self) {
182+
self.inner.aborted.store(true, Ordering::Relaxed);
183+
self.inner.waker.wake();
184+
}
185+
}

futures-util/src/future/abortable.rs

Lines changed: 4 additions & 154 deletions
Original file line numberDiff line numberDiff line change
@@ -1,101 +1,8 @@
11
use super::assert_future;
2-
use crate::task::AtomicWaker;
3-
use alloc::sync::Arc;
4-
use core::fmt;
5-
use core::pin::Pin;
6-
use core::sync::atomic::{AtomicBool, Ordering};
2+
use crate::future::{AbortHandle, Abortable, Aborted};
73
use futures_core::future::Future;
8-
use futures_core::task::{Context, Poll};
9-
use pin_project_lite::pin_project;
104

11-
pin_project! {
12-
/// A future which can be remotely short-circuited using an `AbortHandle`.
13-
#[derive(Debug, Clone)]
14-
#[must_use = "futures do nothing unless you `.await` or poll them"]
15-
pub struct Abortable<Fut> {
16-
#[pin]
17-
future: Fut,
18-
inner: Arc<AbortInner>,
19-
}
20-
}
21-
22-
impl<Fut> Abortable<Fut>
23-
where
24-
Fut: Future,
25-
{
26-
/// Creates a new `Abortable` future using an existing `AbortRegistration`.
27-
/// `AbortRegistration`s can be acquired through `AbortHandle::new`.
28-
///
29-
/// When `abort` is called on the handle tied to `reg` or if `abort` has
30-
/// already been called, the future will complete immediately without making
31-
/// any further progress.
32-
///
33-
/// Example:
34-
///
35-
/// ```
36-
/// # futures::executor::block_on(async {
37-
/// use futures::future::{Abortable, AbortHandle, Aborted};
38-
///
39-
/// let (abort_handle, abort_registration) = AbortHandle::new_pair();
40-
/// let future = Abortable::new(async { 2 }, abort_registration);
41-
/// abort_handle.abort();
42-
/// assert_eq!(future.await, Err(Aborted));
43-
/// # });
44-
/// ```
45-
pub fn new(future: Fut, reg: AbortRegistration) -> Self {
46-
assert_future::<Result<Fut::Output, Aborted>, _>(Self { future, inner: reg.inner })
47-
}
48-
}
49-
50-
/// A registration handle for a `Abortable` future.
51-
/// Values of this type can be acquired from `AbortHandle::new` and are used
52-
/// in calls to `Abortable::new`.
53-
#[derive(Debug)]
54-
pub struct AbortRegistration {
55-
inner: Arc<AbortInner>,
56-
}
57-
58-
/// A handle to a `Abortable` future.
59-
#[derive(Debug, Clone)]
60-
pub struct AbortHandle {
61-
inner: Arc<AbortInner>,
62-
}
63-
64-
impl AbortHandle {
65-
/// Creates an (`AbortHandle`, `AbortRegistration`) pair which can be used
66-
/// to abort a running future.
67-
///
68-
/// This function is usually paired with a call to `Abortable::new`.
69-
///
70-
/// Example:
71-
///
72-
/// ```
73-
/// # futures::executor::block_on(async {
74-
/// use futures::future::{Abortable, AbortHandle, Aborted};
75-
///
76-
/// let (abort_handle, abort_registration) = AbortHandle::new_pair();
77-
/// let future = Abortable::new(async { 2 }, abort_registration);
78-
/// abort_handle.abort();
79-
/// assert_eq!(future.await, Err(Aborted));
80-
/// # });
81-
/// ```
82-
pub fn new_pair() -> (Self, AbortRegistration) {
83-
let inner =
84-
Arc::new(AbortInner { waker: AtomicWaker::new(), cancel: AtomicBool::new(false) });
85-
86-
(Self { inner: inner.clone() }, AbortRegistration { inner })
87-
}
88-
}
89-
90-
// Inner type storing the waker to awaken and a bool indicating that it
91-
// should be cancelled.
92-
#[derive(Debug)]
93-
struct AbortInner {
94-
waker: AtomicWaker,
95-
cancel: AtomicBool,
96-
}
97-
98-
/// Creates a new `Abortable` future and a `AbortHandle` which can be used to stop it.
5+
/// Creates a new `Abortable` future and an `AbortHandle` which can be used to stop it.
996
///
1007
/// This function is a convenient (but less flexible) alternative to calling
1018
/// `AbortHandle::new` and `Abortable::new` manually.
@@ -107,63 +14,6 @@ where
10714
Fut: Future,
10815
{
10916
let (handle, reg) = AbortHandle::new_pair();
110-
(Abortable::new(future, reg), handle)
111-
}
112-
113-
/// Indicator that the `Abortable` future was aborted.
114-
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
115-
pub struct Aborted;
116-
117-
impl fmt::Display for Aborted {
118-
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
119-
write!(f, "`Abortable` future has been aborted")
120-
}
121-
}
122-
123-
#[cfg(feature = "std")]
124-
impl std::error::Error for Aborted {}
125-
126-
impl<Fut> Future for Abortable<Fut>
127-
where
128-
Fut: Future,
129-
{
130-
type Output = Result<Fut::Output, Aborted>;
131-
132-
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
133-
// Check if the future has been aborted
134-
if self.inner.cancel.load(Ordering::Relaxed) {
135-
return Poll::Ready(Err(Aborted));
136-
}
137-
138-
// attempt to complete the future
139-
if let Poll::Ready(x) = self.as_mut().project().future.poll(cx) {
140-
return Poll::Ready(Ok(x));
141-
}
142-
143-
// Register to receive a wakeup if the future is aborted in the... future
144-
self.inner.waker.register(cx.waker());
145-
146-
// Check to see if the future was aborted between the first check and
147-
// registration.
148-
// Checking with `Relaxed` is sufficient because `register` introduces an
149-
// `AcqRel` barrier.
150-
if self.inner.cancel.load(Ordering::Relaxed) {
151-
return Poll::Ready(Err(Aborted));
152-
}
153-
154-
Poll::Pending
155-
}
156-
}
157-
158-
impl AbortHandle {
159-
/// Abort the `Abortable` future associated with this handle.
160-
///
161-
/// Notifies the Abortable future associated with this handle that it
162-
/// should abort. Note that if the future is currently being polled on
163-
/// another thread, it will not immediately stop running. Instead, it will
164-
/// continue to run until its poll method returns.
165-
pub fn abort(&self) {
166-
self.inner.cancel.store(true, Ordering::Relaxed);
167-
self.inner.waker.wake();
168-
}
17+
let abortable = assert_future::<Result<Fut::Output, Aborted>, _>(Abortable::new(future, reg));
18+
(abortable, handle)
16919
}

futures-util/src/future/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,9 @@ cfg_target_has_atomic! {
112112
#[cfg(feature = "alloc")]
113113
mod abortable;
114114
#[cfg(feature = "alloc")]
115-
pub use self::abortable::{abortable, Abortable, AbortHandle, AbortRegistration, Aborted};
115+
pub use crate::abortable::{Abortable, AbortHandle, AbortRegistration, Aborted};
116+
#[cfg(feature = "alloc")]
117+
pub use abortable::abortable;
116118
}
117119

118120
// Just a helper function to ensure the futures we're returning all have the

futures-util/src/lib.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,5 +336,10 @@ pub use crate::io::{
336336
#[cfg(feature = "alloc")]
337337
pub mod lock;
338338

339+
cfg_target_has_atomic! {
340+
#[cfg(feature = "alloc")]
341+
mod abortable;
342+
}
343+
339344
mod fns;
340345
mod unfold_state;

futures-util/src/stream/abortable.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
use super::assert_stream;
2+
use crate::stream::{AbortHandle, Abortable};
3+
use crate::Stream;
4+
5+
/// Creates a new `Abortable` stream and an `AbortHandle` which can be used to stop it.
6+
///
7+
/// This function is a convenient (but less flexible) alternative to calling
8+
/// `AbortHandle::new` and `Abortable::new` manually.
9+
///
10+
/// This function is only available when the `std` or `alloc` feature of this
11+
/// library is activated, and it is activated by default.
12+
pub fn abortable<St>(stream: St) -> (Abortable<St>, AbortHandle)
13+
where
14+
St: Stream,
15+
{
16+
let (handle, reg) = AbortHandle::new_pair();
17+
let abortable = assert_stream::<St::Item, _>(Abortable::new(stream, reg));
18+
(abortable, handle)
19+
}

0 commit comments

Comments
 (0)