Skip to content

Commit 3f7355d

Browse files
andrewjcgfacebook-github-bot
authored andcommitted
Add helper to convert PortRef to a Sink (#434)
Summary: Pull Request resolved: #434 Makes it easier to combine `PortRef` with `SinkExt` helpers. Reviewed By: mariusae, highker, shayne-fletcher Differential Revision: D77694800 fbshipit-source-id: 7246f78e4a1a0f0610175664c187fc00b17b077f
1 parent 5930c52 commit 3f7355d

File tree

2 files changed

+41
-0
lines changed

2 files changed

+41
-0
lines changed

hyperactor/src/mailbox.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ use async_trait::async_trait;
8888
use dashmap::DashMap;
8989
use dashmap::DashSet;
9090
use dashmap::mapref::entry::Entry;
91+
use futures::Sink;
9192
use serde::Deserialize;
9293
use serde::Serialize;
9394
use serde::de::DeserializeOwned;
@@ -108,6 +109,7 @@ use crate::actor::Signal;
108109
use crate::actor::remote::USER_PORT_OFFSET;
109110
use crate::attrs::Attrs;
110111
use crate::cap;
112+
use crate::cap::CanSend;
111113
use crate::channel;
112114
use crate::channel::ChannelAddr;
113115
use crate::channel::ChannelError;
@@ -970,6 +972,39 @@ impl MailboxSender for MailboxClient {
970972
}
971973
}
972974

975+
/// Wrapper to turn `PortRef` into a `Sink`.
976+
pub struct PortSink<'a, C: CanSend, M: RemoteMessage> {
977+
caps: &'a C,
978+
port: PortRef<M>,
979+
}
980+
981+
impl<'a, C: CanSend, M: RemoteMessage> PortSink<'a, C, M> {
982+
/// Create new PortSink
983+
pub fn new(caps: &'a C, port: PortRef<M>) -> Self {
984+
Self { caps, port }
985+
}
986+
}
987+
988+
impl<'a, C: CanSend, M: RemoteMessage> Sink<M> for PortSink<'a, C, M> {
989+
type Error = MailboxSenderError;
990+
991+
fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
992+
Poll::Ready(Ok(()))
993+
}
994+
995+
fn start_send(self: Pin<&mut Self>, item: M) -> Result<(), Self::Error> {
996+
self.port.send(self.caps, item)
997+
}
998+
999+
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1000+
Poll::Ready(Ok(()))
1001+
}
1002+
1003+
fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1004+
Poll::Ready(Ok(()))
1005+
}
1006+
}
1007+
9731008
/// A mailbox coordinates message delivery to actors through typed
9741009
/// [`Port`]s associated with the mailbox.
9751010
#[derive(Clone, Debug)]

hyperactor/src/reference.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ use crate::cap;
5151
use crate::data::Serialized;
5252
use crate::mailbox::MailboxSenderError;
5353
use crate::mailbox::MailboxSenderErrorKind;
54+
use crate::mailbox::PortSink;
5455
use crate::message::Bind;
5556
use crate::message::Bindings;
5657
use crate::message::Unbind;
@@ -893,6 +894,11 @@ impl<M: RemoteMessage> PortRef<M> {
893894
pub fn send_serialized(&self, caps: &impl cap::CanSend, message: Serialized, headers: Attrs) {
894895
caps.post(self.port_id.clone(), headers, message);
895896
}
897+
898+
/// Convert this port into a sink that can be used to send messages using the given capability.
899+
pub fn into_sink<'a, C: cap::CanSend>(self, caps: &'a C) -> PortSink<'a, C, M> {
900+
PortSink::new(caps, self)
901+
}
896902
}
897903

898904
impl<M: RemoteMessage> Clone for PortRef<M> {

0 commit comments

Comments
 (0)