Skip to content

Commit bf4d5b2

Browse files
andrewjcgfacebook-github-bot
authored andcommitted
Don't close-on-drop if already shutdown (#536)
Summary: Pull Request resolved: #536 Fix a bug where we'd send another EOF (to a potentially closed channel) after we'd already shutdown the stream. Reviewed By: highker Differential Revision: D78286093 fbshipit-source-id: f8b2f799ac71e977ef7d83be8381987a588f9b5c
1 parent cb8ac0c commit bf4d5b2

File tree

1 file changed

+64
-7
lines changed

1 file changed

+64
-7
lines changed

hyperactor_mesh/src/connect.rs

Lines changed: 64 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ use hyperactor::message::Bind;
6464
use hyperactor::message::Bindings;
6565
use hyperactor::message::Unbind;
6666
use pin_project::pin_project;
67+
use pin_project::pinned_drop;
6768
use serde::Deserialize;
6869
use serde::Serialize;
6970
use tokio::io::AsyncRead;
@@ -94,10 +95,15 @@ pub struct OwnedReadHalf {
9495
}
9596

9697
/// Wrap a `PortRef<IoMsg>` as a `AsyncWrite`.
98+
#[pin_project(PinnedDrop)]
9799
pub struct OwnedWriteHalf<C: CanSend> {
98100
peer: ActorId,
101+
#[pin]
99102
caps: C,
103+
#[pin]
100104
port: PortRef<Io>,
105+
#[pin]
106+
shutdown: bool,
101107
}
102108

103109
/// A duplex bytestream connection between two actors. Can generally be used like a `TcpStream`.
@@ -144,7 +150,12 @@ impl OwnedReadHalf {
144150

145151
impl<C: CanSend> OwnedWriteHalf<C> {
146152
fn new(peer: ActorId, caps: C, port: PortRef<Io>) -> Self {
147-
Self { peer, caps, port }
153+
Self {
154+
peer,
155+
caps,
156+
port,
157+
shutdown: false,
158+
}
148159
}
149160

150161
pub fn peer(&self) -> &ActorId {
@@ -159,10 +170,13 @@ impl<C: CanSend> OwnedWriteHalf<C> {
159170
}
160171
}
161172

162-
impl<C: CanSend> Drop for OwnedWriteHalf<C> {
163-
fn drop(&mut self) {
164-
// Send EOF on drop.
165-
let _ = self.port.send(&self.caps, Io::Eof);
173+
#[pinned_drop]
174+
impl<C: CanSend> PinnedDrop for OwnedWriteHalf<C> {
175+
fn drop(self: Pin<&mut Self>) {
176+
let this = self.project();
177+
if !*this.shutdown {
178+
let _ = this.port.send(&*this.caps, Io::Eof);
179+
}
166180
}
167181
}
168182

@@ -247,7 +261,14 @@ impl<C: CanSend> AsyncWrite for OwnedWriteHalf<C> {
247261
_cx: &mut Context<'_>,
248262
buf: &[u8],
249263
) -> Poll<Result<usize, std::io::Error>> {
250-
match self.port.send(&self.caps, Io::Data(buf.into())) {
264+
let this = self.project();
265+
if *this.shutdown {
266+
return Poll::Ready(Err(std::io::Error::new(
267+
std::io::ErrorKind::BrokenPipe,
268+
"write after shutdown",
269+
)));
270+
}
271+
match this.port.send(&*this.caps, Io::Data(buf.into())) {
251272
Ok(()) => Poll::Ready(Ok(buf.len())),
252273
Err(e) => Poll::Ready(Err(std::io::Error::other(e))),
253274
}
@@ -263,7 +284,11 @@ impl<C: CanSend> AsyncWrite for OwnedWriteHalf<C> {
263284
) -> Poll<Result<(), std::io::Error>> {
264285
// Send EOF on shutdown.
265286
match self.port.send(&self.caps, Io::Eof) {
266-
Ok(()) => Poll::Ready(Ok(())),
287+
Ok(()) => {
288+
let mut this = self.project();
289+
*this.shutdown = true;
290+
Poll::Ready(Ok(()))
291+
}
267292
Err(e) => Poll::Ready(Err(std::io::Error::other(e))),
268293
}
269294
}
@@ -471,4 +496,36 @@ mod tests {
471496

472497
Ok(())
473498
}
499+
500+
#[tokio::test]
501+
async fn test_no_eof_on_drop_after_shutdown() -> Result<()> {
502+
let proc = Proc::local();
503+
let client = proc.attach("client")?;
504+
505+
let (connect, completer) = Connect::allocate(client.actor_id().clone(), client.clone());
506+
let (mut rd, _) = accept(&client, client.actor_id().clone(), connect)
507+
.await?
508+
.into_split();
509+
let (_, mut wr) = completer.complete().await?.into_split();
510+
511+
// Write some data
512+
let send = [1u8, 2u8, 3u8];
513+
wr.write_all(&send).await?;
514+
515+
// Explicitly shutdown the writer - this sends EOF and sets shutdown=true
516+
wr.shutdown().await?;
517+
518+
// Reader should receive the data and then EOF (from explicit shutdown, not from drop)
519+
let mut recv = vec![];
520+
rd.read_to_end(&mut recv).await?;
521+
assert_eq!(&send, recv.as_slice());
522+
523+
// Drop the writer after explicit shutdown - this should NOT send another EOF
524+
drop(wr);
525+
526+
// Verify we didn't see another EOF message.
527+
assert!(rd.inner.into_inner().port.try_recv().unwrap().is_none());
528+
529+
Ok(())
530+
}
474531
}

0 commit comments

Comments
 (0)