Skip to content

Commit d5a3976

Browse files
taiki-eNemo157
authored andcommitted
impl Sink for stream combinators
1 parent 8735d40 commit d5a3976

File tree

15 files changed

+94
-84
lines changed

15 files changed

+94
-84
lines changed

futures-util/src/stream/chunks.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::stream::Fuse;
22
use futures_core::stream::Stream;
33
use futures_core::task::{Waker, Poll};
4+
use futures_sink::Sink;
45
use pin_utils::{unsafe_pinned, unsafe_unpinned};
56
use core::mem;
67
use core::pin::Pin;
@@ -100,14 +101,12 @@ impl<St: Stream> Stream for Chunks<St> {
100101
}
101102
}
102103

103-
/* TODO
104104
// Forwarding impl of Sink from the underlying stream
105-
impl<S> Sink for Chunks<S>
106-
where S: Sink + Stream
105+
impl<S, Item> Sink<Item> for Chunks<S>
106+
where
107+
S: Stream + Sink<Item>,
107108
{
108-
type SinkItem = S::SinkItem;
109109
type SinkError = S::SinkError;
110110

111-
delegate_sink!(stream);
111+
delegate_sink!(stream, Item);
112112
}
113-
*/

futures-util/src/stream/filter.rs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use core::pin::Pin;
22
use futures_core::future::Future;
33
use futures_core::stream::{FusedStream, Stream};
44
use futures_core::task::{Waker, Poll};
5+
use futures_sink::Sink;
56
use pin_utils::{unsafe_pinned, unsafe_unpinned};
67

78
/// A stream combinator used to filter the results of a stream and only yield
@@ -113,17 +114,13 @@ impl<St, Fut, F> Stream for Filter<St, Fut, F>
113114
}
114115
}
115116

116-
/* TODO
117117
// Forwarding impl of Sink from the underlying stream
118-
impl<S, R, P> Sink for Filter<S, R, P>
119-
where S: Stream,
120-
P: FnMut(&S::Item) -> R,
121-
R: Future<Item = bool>,
122-
S: Sink,
118+
impl<S, Fut, F, Item> Sink<Item> for Filter<S, Fut, F>
119+
where S: Stream + Sink<Item>,
120+
F: FnMut(&S::Item) -> Fut,
121+
Fut: Future<Output = bool>,
123122
{
124-
type SinkItem = S::SinkItem;
125123
type SinkError = S::SinkError;
126124

127-
delegate_sink!(stream);
125+
delegate_sink!(stream, Item);
128126
}
129-
*/

futures-util/src/stream/filter_map.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use core::pin::Pin;
22
use futures_core::future::Future;
33
use futures_core::stream::{FusedStream, Stream};
44
use futures_core::task::{Waker, Poll};
5+
use futures_sink::Sink;
56
use pin_utils::{unsafe_pinned, unsafe_unpinned};
67

78
/// A combinator used to filter the results of a stream and simultaneously map
@@ -103,16 +104,13 @@ impl<St, Fut, F, T> Stream for FilterMap<St, Fut, F>
103104
}
104105
}
105106

106-
/* TODO
107107
// Forwarding impl of Sink from the underlying stream
108-
impl<S, R, F> Sink for FilterMap<S, R, F>
109-
where S: Stream + Sink,
110-
F: FnMut(S::Item) -> R,
111-
R: IntoFuture<Error=S::Error>,
108+
impl<S, Fut, F, Item> Sink<Item> for FilterMap<S, Fut, F>
109+
where S: Stream + Sink<Item>,
110+
F: FnMut(S::Item) -> Fut,
111+
Fut: Future,
112112
{
113-
type SinkItem = S::SinkItem;
114113
type SinkError = S::SinkError;
115114

116-
delegate_sink!(stream);
115+
delegate_sink!(stream, Item);
117116
}
118-
*/

futures-util/src/stream/flatten.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use core::pin::Pin;
22
use futures_core::stream::{FusedStream, Stream};
33
use futures_core::task::{Waker, Poll};
4+
use futures_sink::Sink;
45
use pin_utils::unsafe_pinned;
56

67
/// A combinator used to flatten a stream-of-streams into one long stream of
@@ -89,14 +90,12 @@ impl<St> Stream for Flatten<St>
8990
}
9091
}
9192

92-
/* TODO
9393
// Forwarding impl of Sink from the underlying stream
94-
impl<S> Sink for Flatten<S>
95-
where S: Sink + Stream
94+
impl<S, Item> Sink<Item> for Flatten<S>
95+
where S: Stream + Sink<Item>,
96+
S::Item: Stream,
9697
{
97-
type SinkItem = S::SinkItem;
9898
type SinkError = S::SinkError;
9999

100-
delegate_sink!(stream);
100+
delegate_sink!(stream, Item);
101101
}
102-
*/

futures-util/src/stream/inspect.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use core::pin::Pin;
22
use futures_core::stream::{FusedStream, Stream};
33
use futures_core::task::{Waker, Poll};
4+
use futures_sink::Sink;
45
use pin_utils::{unsafe_pinned, unsafe_unpinned};
56

67
/// Do something with the items of a stream, passing it on.
@@ -17,7 +18,7 @@ impl<St: Stream + Unpin, F> Unpin for Inspect<St, F> {}
1718

1819
impl<St, F> Inspect<St, F>
1920
where St: Stream,
20-
F: FnMut(&St::Item) -> (),
21+
F: FnMut(&St::Item),
2122
{
2223
unsafe_pinned!(stream: St);
2324
unsafe_unpinned!(f: F);
@@ -74,14 +75,12 @@ impl<St, F> Stream for Inspect<St, F>
7475
}
7576
}
7677

77-
/* TODO
7878
// Forwarding impl of Sink from the underlying stream
79-
impl<S, F> Sink for Inspect<S, F>
80-
where S: Sink + Stream
79+
impl<S, F, Item> Sink<Item> for Inspect<S, F>
80+
where S: Stream + Sink<Item>,
81+
F: FnMut(&S::Item),
8182
{
82-
type SinkItem = S::SinkItem;
8383
type SinkError = S::SinkError;
8484

85-
delegate_sink!(stream);
85+
delegate_sink!(stream, Item);
8686
}
87-
*/

futures-util/src/stream/map.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use core::pin::Pin;
22
use futures_core::stream::{FusedStream, Stream};
33
use futures_core::task::{Waker, Poll};
4+
use futures_sink::Sink;
45
use pin_utils::{unsafe_pinned, unsafe_unpinned};
56

67
/// A stream combinator which will change the type of a stream from one
@@ -72,14 +73,12 @@ impl<St, F, T> Stream for Map<St, F>
7273
}
7374
}
7475

75-
/* TODO
7676
// Forwarding impl of Sink from the underlying stream
77-
impl<S, F> Sink for Map<S, F>
78-
where S: Sink
77+
impl<S, F, T, Item> Sink<Item> for Map<S, F>
78+
where S: Stream + Sink<Item>,
79+
F: FnMut(S::Item) -> T,
7980
{
80-
type SinkItem = S::SinkItem;
8181
type SinkError = S::SinkError;
8282

83-
delegate_sink!(stream);
83+
delegate_sink!(stream, Item);
8484
}
85-
*/

futures-util/src/stream/peek.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use crate::stream::{StreamExt, Fuse};
22
use core::pin::Pin;
33
use futures_core::stream::{FusedStream, Stream};
44
use futures_core::task::{Waker, Poll};
5+
use futures_sink::Sink;
56
use pin_utils::{unsafe_pinned, unsafe_unpinned};
67

78
/// A `Stream` that implements a `peek` method.
@@ -72,14 +73,11 @@ impl<S: Stream> Stream for Peekable<S> {
7273
}
7374
}
7475

75-
/* TODO
7676
// Forwarding impl of Sink from the underlying stream
77-
impl<S> Sink for Peekable<S>
78-
where S: Sink + Stream
77+
impl<S, Item> Sink<Item> for Peekable<S>
78+
where S: Sink<Item> + Stream
7979
{
80-
type SinkItem = S::SinkItem;
8180
type SinkError = S::SinkError;
8281

83-
delegate_sink!(stream);
82+
delegate_sink!(stream, Item);
8483
}
85-
*/

futures-util/src/stream/skip.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use core::pin::Pin;
22
use futures_core::stream::{FusedStream, Stream};
33
use futures_core::task::{Waker, Poll};
4+
use futures_sink::Sink;
45
use pin_utils::{unsafe_pinned, unsafe_unpinned};
56

67
/// A stream combinator which skips a number of elements before continuing.
@@ -74,14 +75,12 @@ impl<St: Stream> Stream for Skip<St> {
7475
}
7576
}
7677

77-
/* TODO
7878
// Forwarding impl of Sink from the underlying stream
79-
impl<S> Sink for Skip<S>
80-
where S: Sink
79+
impl<S, Item> Sink<Item> for Skip<S>
80+
where
81+
S: Stream + Sink<Item>,
8182
{
82-
type SinkItem = S::SinkItem;
8383
type SinkError = S::SinkError;
8484

85-
delegate_sink!(stream);
85+
delegate_sink!(stream, Item);
8686
}
87-
*/

futures-util/src/stream/skip_while.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use core::pin::Pin;
22
use futures_core::future::Future;
33
use futures_core::stream::{FusedStream, Stream};
44
use futures_core::task::{Waker, Poll};
5+
use futures_sink::Sink;
56
use pin_utils::{unsafe_pinned, unsafe_unpinned};
67

78
/// A stream combinator which skips elements of a stream while a predicate
@@ -109,14 +110,13 @@ impl<St, Fut, F> Stream for SkipWhile<St, Fut, F>
109110
}
110111
}
111112

112-
/* TODO
113113
// Forwarding impl of Sink from the underlying stream
114-
impl<S, R, P> Sink for SkipWhile<S, R, P>
115-
where S: Sink + Stream, R: IntoFuture
114+
impl<S, Fut, F, Item> Sink<Item> for SkipWhile<S, Fut, F>
115+
where S: Stream + Sink<Item>,
116+
F: FnMut(&S::Item) -> Fut,
117+
Fut: Future<Output = bool>,
116118
{
117-
type SinkItem = S::SinkItem;
118119
type SinkError = S::SinkError;
119120

120-
delegate_sink!(stream);
121+
delegate_sink!(stream, Item);
121122
}
122-
*/

futures-util/src/stream/take.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use core::pin::Pin;
22
use futures_core::stream::Stream;
33
use futures_core::task::{Waker, Poll};
4+
use futures_sink::Sink;
45
use pin_utils::{unsafe_pinned, unsafe_unpinned};
56

67
/// A stream combinator which returns a maximum number of elements.
@@ -72,14 +73,11 @@ impl<St> Stream for Take<St>
7273
}
7374
}
7475

75-
/* TODO
7676
// Forwarding impl of Sink from the underlying stream
77-
impl<S> Sink for Take<S>
78-
where S: Sink + Stream
77+
impl<S, Item> Sink<Item> for Take<S>
78+
where S: Stream + Sink<Item>,
7979
{
80-
type SinkItem = S::SinkItem;
8180
type SinkError = S::SinkError;
8281

83-
delegate_sink!(stream);
82+
delegate_sink!(stream, Item);
8483
}
85-
*/

0 commit comments

Comments
 (0)