Skip to content

Commit 58e3e1f

Browse files
seanmonstarcramertj
authored andcommitted
Change Sink's associated item type to a generic Sink<Item>
1 parent 465ef06 commit 58e3e1f

27 files changed

+238
-224
lines changed

futures-sink/src/channel_impls.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,7 @@ use futures_core::task::Waker;
33
use futures_channel::mpsc::{Sender, SendError, TrySendError, UnboundedSender};
44
use std::pin::Pin;
55

6-
impl<T> Sink for Sender<T> {
7-
type SinkItem = T;
6+
impl<T> Sink<T> for Sender<T> {
87
type SinkError = SendError;
98

109
fn poll_ready(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<Result<(), Self::SinkError>> {
@@ -25,8 +24,7 @@ impl<T> Sink for Sender<T> {
2524
}
2625
}
2726

28-
impl<T> Sink for UnboundedSender<T> {
29-
type SinkItem = T;
27+
impl<T> Sink<T> for UnboundedSender<T> {
3028
type SinkError = SendError;
3129

3230
fn poll_ready(self: Pin<&mut Self>, waker: &Waker) -> Poll<Result<(), Self::SinkError>> {
@@ -47,8 +45,7 @@ impl<T> Sink for UnboundedSender<T> {
4745
}
4846
}
4947

50-
impl<'a, T> Sink for &'a UnboundedSender<T> {
51-
type SinkItem = T;
48+
impl<'a, T> Sink<T> for &'a UnboundedSender<T> {
5249
type SinkError = SendError;
5350

5451
fn poll_ready(self: Pin<&mut Self>, waker: &Waker) -> Poll<Result<(), Self::SinkError>> {

futures-sink/src/lib.rs

Lines changed: 17 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,7 @@ use core::pin::Pin;
4848
/// importance: you can use it to send an entire stream to a sink, which is
4949
/// the simplest way to ultimately consume a stream.
5050
#[must_use = "sinks do nothing unless polled"]
51-
pub trait Sink {
52-
/// The type of value that the sink accepts.
53-
type SinkItem;
54-
51+
pub trait Sink<Item> {
5552
/// The type of value produced by the sink when an error occurs.
5653
type SinkError;
5754

@@ -88,7 +85,7 @@ pub trait Sink {
8885
///
8986
/// In most cases, if the sink encounters an error, the sink will
9087
/// permanently be unable to receive items.
91-
fn start_send(self: Pin<&mut Self>, item: Self::SinkItem)
88+
fn start_send(self: Pin<&mut Self>, item: Item)
9289
-> Result<(), Self::SinkError>;
9390

9491
/// Flush any remaining output from this sink.
@@ -119,15 +116,14 @@ pub trait Sink {
119116
fn poll_close(self: Pin<&mut Self>, waker: &Waker) -> Poll<Result<(), Self::SinkError>>;
120117
}
121118

122-
impl<'a, S: ?Sized + Sink + Unpin> Sink for &'a mut S {
123-
type SinkItem = S::SinkItem;
119+
impl<'a, S: ?Sized + Sink<Item> + Unpin, Item> Sink<Item> for &'a mut S {
124120
type SinkError = S::SinkError;
125121

126122
fn poll_ready(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<Result<(), Self::SinkError>> {
127123
Pin::new(&mut **self).poll_ready(waker)
128124
}
129125

130-
fn start_send(mut self: Pin<&mut Self>, item: Self::SinkItem) -> Result<(), Self::SinkError> {
126+
fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::SinkError> {
131127
Pin::new(&mut **self).start_send(item)
132128
}
133129

@@ -140,15 +136,14 @@ impl<'a, S: ?Sized + Sink + Unpin> Sink for &'a mut S {
140136
}
141137
}
142138

143-
impl<'a, S: ?Sized + Sink> Sink for Pin<&'a mut S> {
144-
type SinkItem = S::SinkItem;
139+
impl<'a, S: ?Sized + Sink<Item>, Item> Sink<Item> for Pin<&'a mut S> {
145140
type SinkError = S::SinkError;
146141

147142
fn poll_ready(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<Result<(), Self::SinkError>> {
148143
S::poll_ready((*self).as_mut(), waker)
149144
}
150145

151-
fn start_send(mut self: Pin<&mut Self>, item: Self::SinkItem) -> Result<(), Self::SinkError> {
146+
fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::SinkError> {
152147
S::start_send((*self).as_mut(), item)
153148
}
154149

@@ -173,15 +168,14 @@ mod if_alloc {
173168
#[derive(Copy, Clone, Debug)]
174169
pub enum VecSinkError {}
175170

176-
impl<T> Sink for ::alloc::vec::Vec<T> {
177-
type SinkItem = T;
171+
impl<T> Sink<T> for ::alloc::vec::Vec<T> {
178172
type SinkError = VecSinkError;
179173

180174
fn poll_ready(self: Pin<&mut Self>, _: &Waker) -> Poll<Result<(), Self::SinkError>> {
181175
Poll::Ready(Ok(()))
182176
}
183177

184-
fn start_send(self: Pin<&mut Self>, item: Self::SinkItem) -> Result<(), Self::SinkError> {
178+
fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::SinkError> {
185179
// TODO: impl<T> Unpin for Vec<T> {}
186180
unsafe { Pin::get_unchecked_mut(self) }.push(item);
187181
Ok(())
@@ -196,15 +190,14 @@ mod if_alloc {
196190
}
197191
}
198192

199-
impl<T> Sink for ::alloc::collections::VecDeque<T> {
200-
type SinkItem = T;
193+
impl<T> Sink<T> for ::alloc::collections::VecDeque<T> {
201194
type SinkError = VecSinkError;
202195

203196
fn poll_ready(self: Pin<&mut Self>, _: &Waker) -> Poll<Result<(), Self::SinkError>> {
204197
Poll::Ready(Ok(()))
205198
}
206199

207-
fn start_send(self: Pin<&mut Self>, item: Self::SinkItem) -> Result<(), Self::SinkError> {
200+
fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::SinkError> {
208201
// TODO: impl<T> Unpin for Vec<T> {}
209202
unsafe { Pin::get_unchecked_mut(self) }.push_back(item);
210203
Ok(())
@@ -219,15 +212,14 @@ mod if_alloc {
219212
}
220213
}
221214

222-
impl<S: ?Sized + Sink + Unpin> Sink for ::alloc::boxed::Box<S> {
223-
type SinkItem = S::SinkItem;
215+
impl<S: ?Sized + Sink<Item> + Unpin, Item> Sink<Item> for ::alloc::boxed::Box<S> {
224216
type SinkError = S::SinkError;
225217

226218
fn poll_ready(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<Result<(), Self::SinkError>> {
227219
Pin::new(&mut **self).poll_ready(waker)
228220
}
229221

230-
fn start_send(mut self: Pin<&mut Self>, item: Self::SinkItem) -> Result<(), Self::SinkError> {
222+
fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::SinkError> {
231223
Pin::new(&mut **self).start_send(item)
232224
}
233225

@@ -247,13 +239,11 @@ pub use self::if_alloc::*;
247239
#[cfg(feature = "either")]
248240
use either::Either;
249241
#[cfg(feature = "either")]
250-
impl<A, B> Sink for Either<A, B>
251-
where A: Sink,
252-
B: Sink<SinkItem=<A as Sink>::SinkItem,
253-
SinkError=<A as Sink>::SinkError>
242+
impl<A, B, Item> Sink<Item> for Either<A, B>
243+
where A: Sink<Item>,
244+
B: Sink<Item, SinkError=A::SinkError>,
254245
{
255-
type SinkItem = <A as Sink>::SinkItem;
256-
type SinkError = <A as Sink>::SinkError;
246+
type SinkError = A::SinkError;
257247

258248
fn poll_ready(self: Pin<&mut Self>, waker: &Waker) -> Poll<Result<(), Self::SinkError>> {
259249
unsafe {
@@ -264,7 +254,7 @@ impl<A, B> Sink for Either<A, B>
264254
}
265255
}
266256

267-
fn start_send(self: Pin<&mut Self>, item: Self::SinkItem) -> Result<(), Self::SinkError> {
257+
fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::SinkError> {
268258
unsafe {
269259
match Pin::get_unchecked_mut(self) {
270260
Either::Left(x) => Pin::new_unchecked(x).start_send(item),

futures-util/src/compat/compat01as03.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -210,16 +210,15 @@ where
210210
}
211211
}
212212

213-
impl<S, SinkItem> Sink03 for Compat01As03Sink<S, SinkItem>
213+
impl<S, SinkItem> Sink03<SinkItem> for Compat01As03Sink<S, SinkItem>
214214
where
215215
S: Sink01<SinkItem = SinkItem>,
216216
{
217-
type SinkItem = SinkItem;
218217
type SinkError = S::SinkError;
219218

220219
fn start_send(
221220
mut self: Pin<&mut Self>,
222-
item: Self::SinkItem,
221+
item: SinkItem,
223222
) -> Result<(), Self::SinkError> {
224223
debug_assert!(self.buffer.is_none());
225224
self.buffer = Some(item);

futures-util/src/compat/compat03as01.rs

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use futures_core::{
1515
use futures_sink::Sink as Sink03;
1616
use crate::task::{ArcWake as ArcWake03, WakerRef};
1717
use std::{
18+
marker::PhantomData,
1819
mem,
1920
pin::Pin,
2021
sync::Arc,
@@ -32,6 +33,14 @@ pub struct Compat<T> {
3233
pub(crate) inner: T,
3334
}
3435

36+
/// Converts a futures 0.3 Sink object to a futures 0.1-compatible version
37+
#[derive(Debug)]
38+
#[must_use = "sinks do nothing unless polled"]
39+
pub struct CompatSink<T, Item> {
40+
inner: T,
41+
_phantom: PhantomData<fn(Item)>,
42+
}
43+
3544
impl<T> Compat<T> {
3645
/// Returns the inner item.
3746
pub fn into_inner(self) -> T {
@@ -48,6 +57,21 @@ impl<T> Compat<T> {
4857
}
4958
}
5059

60+
impl<T, Item> CompatSink<T, Item> {
61+
/// Returns the inner item.
62+
pub fn into_inner(self) -> T {
63+
self.inner
64+
}
65+
66+
/// Creates a new [`CompatSink`].
67+
pub fn new(inner: T) -> Self {
68+
CompatSink {
69+
inner,
70+
_phantom: PhantomData,
71+
}
72+
}
73+
}
74+
5175
fn poll_03_to_01<T, E>(x: task03::Poll<Result<T, E>>)
5276
-> Result<Async01<T>, E>
5377
{
@@ -87,18 +111,18 @@ where
87111
}
88112
}
89113

90-
impl<T> Sink01 for Compat<T>
114+
impl<T, Item> Sink01 for CompatSink<T, Item>
91115
where
92-
T: Sink03 + Unpin,
116+
T: Sink03<Item> + Unpin,
93117
{
94-
type SinkItem = T::SinkItem;
118+
type SinkItem = Item;
95119
type SinkError = T::SinkError;
96120

97121
fn start_send(
98122
&mut self,
99123
item: Self::SinkItem,
100124
) -> StartSend01<Self::SinkItem, Self::SinkError> {
101-
with_context(self, |mut inner, waker| {
125+
with_sink_context(self, |mut inner, waker| {
102126
match inner.as_mut().poll_ready(waker) {
103127
task03::Poll::Ready(Ok(())) => {
104128
inner.start_send(item).map(|()| AsyncSink01::Ready)
@@ -110,11 +134,11 @@ where
110134
}
111135

112136
fn poll_complete(&mut self) -> Poll01<(), Self::SinkError> {
113-
with_context(self, |inner, waker| poll_03_to_01(inner.poll_flush(waker)))
137+
with_sink_context(self, |inner, waker| poll_03_to_01(inner.poll_flush(waker)))
114138
}
115139

116140
fn close(&mut self) -> Poll01<(), Self::SinkError> {
117-
with_context(self, |inner, waker| poll_03_to_01(inner.poll_close(waker)))
141+
with_sink_context(self, |inner, waker| poll_03_to_01(inner.poll_close(waker)))
118142
}
119143
}
120144

@@ -172,6 +196,16 @@ where
172196
f(Pin::new(&mut compat.inner), &waker)
173197
}
174198

199+
fn with_sink_context<T, Item, R, F>(compat: &mut CompatSink<T, Item>, f: F) -> R
200+
where
201+
T: Unpin,
202+
F: FnOnce(Pin<&mut T>, &task03::Waker) -> R,
203+
{
204+
let current = Current::new();
205+
let waker = current.as_waker();
206+
f(Pin::new(&mut compat.inner), &waker)
207+
}
208+
175209
#[cfg(feature = "io-compat")]
176210
mod io {
177211
use super::*;

futures-util/src/compat/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,4 @@ pub use self::compat01as03::{Compat01As03, Compat01As03Sink, Future01CompatExt,
1212
pub use self::compat01as03::{AsyncRead01CompatExt, AsyncWrite01CompatExt};
1313

1414
mod compat03as01;
15-
pub use self::compat03as01::Compat;
15+
pub use self::compat03as01::{Compat, CompatSink};

futures-util/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ pub mod core_reexport {
4545
}
4646

4747
macro_rules! delegate_sink {
48-
($field:ident) => {
48+
($field:ident, $item:ty) => {
4949
fn poll_ready(
5050
self: Pin<&mut Self>,
5151
waker: &$crate::core_reexport::task::Waker,
@@ -55,7 +55,7 @@ macro_rules! delegate_sink {
5555

5656
fn start_send(
5757
self: Pin<&mut Self>,
58-
item: Self::SinkItem
58+
item: $item,
5959
) -> Result<(), Self::SinkError> {
6060
self.$field().start_send(item)
6161
}

futures-util/src/sink/buffer.rs

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,23 +9,23 @@ use alloc::collections::VecDeque;
99
/// number of values when the underlying sink is unable to accept them.
1010
#[derive(Debug)]
1111
#[must_use = "sinks do nothing unless polled"]
12-
pub struct Buffer<Si: Sink> {
12+
pub struct Buffer<Si: Sink<Item>, Item> {
1313
sink: Si,
14-
buf: VecDeque<Si::SinkItem>,
14+
buf: VecDeque<Item>,
1515

1616
// Track capacity separately from the `VecDeque`, which may be rounded up
1717
capacity: usize,
1818
}
1919

20-
impl<Si: Sink + Unpin> Unpin for Buffer<Si> {}
20+
impl<Si: Sink<Item> + Unpin, Item> Unpin for Buffer<Si, Item> {}
2121

22-
impl<Si: Sink> Buffer<Si> {
22+
impl<Si: Sink<Item>, Item> Buffer<Si, Item> {
2323
unsafe_pinned!(sink: Si);
24-
unsafe_unpinned!(buf: VecDeque<Si::SinkItem>);
24+
unsafe_unpinned!(buf: VecDeque<Item>);
2525
unsafe_unpinned!(capacity: usize);
2626

2727

28-
pub(super) fn new(sink: Si, capacity: usize) -> Buffer<Si> {
28+
pub(super) fn new(sink: Si, capacity: usize) -> Self {
2929
Buffer {
3030
sink,
3131
buf: VecDeque::with_capacity(capacity),
@@ -56,16 +56,15 @@ impl<Si: Sink> Buffer<Si> {
5656
}
5757

5858
// Forwarding impl of Stream from the underlying sink
59-
impl<S> Stream for Buffer<S> where S: Sink + Stream {
59+
impl<S, Item> Stream for Buffer<S, Item> where S: Sink<Item> + Stream {
6060
type Item = S::Item;
6161

6262
fn poll_next(self: Pin<&mut Self>, waker: &Waker) -> Poll<Option<S::Item>> {
6363
self.sink().poll_next(waker)
6464
}
6565
}
6666

67-
impl<Si: Sink> Sink for Buffer<Si> {
68-
type SinkItem = Si::SinkItem;
67+
impl<Si: Sink<Item>, Item> Sink<Item> for Buffer<Si, Item> {
6968
type SinkError = Si::SinkError;
7069

7170
fn poll_ready(
@@ -89,7 +88,7 @@ impl<Si: Sink> Sink for Buffer<Si> {
8988

9089
fn start_send(
9190
mut self: Pin<&mut Self>,
92-
item: Self::SinkItem,
91+
item: Item,
9392
) -> Result<(), Self::SinkError> {
9493
if self.capacity == 0 {
9594
self.as_mut().sink().start_send(item)

0 commit comments

Comments
 (0)