Skip to content

Commit 04af59f

Browse files
andrewjcgfacebook-github-bot
authored andcommitted
Allow PortReceiver to be used as Stream (#443)
Summary: Pull Request resolved: #443 Adds a `Stream` impl for `PortReceiver` to make it usable with `StreamExt` helpers. Reviewed By: mariusae, shayne-fletcher Differential Revision: D77698392 fbshipit-source-id: 868e92e8ae0412f5decc4869efc20d5056575835
1 parent 4cd0095 commit 04af59f

File tree

1 file changed

+9
-0
lines changed

1 file changed

+9
-0
lines changed

hyperactor/src/mailbox.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ use dashmap::DashMap;
8989
use dashmap::DashSet;
9090
use dashmap::mapref::entry::Entry;
9191
use futures::Sink;
92+
use futures::Stream;
9293
use serde::Deserialize;
9394
use serde::Serialize;
9495
use serde::de::DeserializeOwned;
@@ -1650,6 +1651,14 @@ impl<M> Drop for PortReceiver<M> {
16501651
}
16511652
}
16521653

1654+
impl<M> Stream for PortReceiver<M> {
1655+
type Item = Result<M, MailboxError>;
1656+
1657+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1658+
std::pin::pin!(self.recv()).poll(cx).map(Some)
1659+
}
1660+
}
1661+
16531662
/// A receiver of M-typed messages from [`OncePort`]s.
16541663
pub struct OncePortReceiver<M> {
16551664
receiver: Option<oneshot::Receiver<M>>,

0 commit comments

Comments
 (0)