Skip to content

Commit 41f6283

Browse files
andrewjcgfacebook-github-bot
authored andcommitted
Cleanup/refactor connection helper library (#412)
Summary: Pull Request resolved: #412 This diff does some refactor of the connection helper: 1) Add a `ActorConnection` wrapper which to model the connection which, similar to `TcpStream` can be split into owned read and write halves. 2) Make the `AsyncRead`/`AsyncWrite`s that represent the connection have a `.peer()` method to return the `ActorId` of the other side. Reviewed By: shayne-fletcher, mariusae Differential Revision: D77596371 fbshipit-source-id: 96aa5d5ea271d0dc14d7b1f52f6eb818e06b6459
1 parent 56e0056 commit 41f6283

File tree

4 files changed

+330
-155
lines changed

4 files changed

+330
-155
lines changed

hyperactor_mesh/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ libc = "0.2.139"
4040
mockall = "0.13.1"
4141
ndslice = { version = "0.0.0", path = "../ndslice" }
4242
nix = { version = "0.29.0", features = ["dir", "event", "hostname", "inotify", "ioctl", "mman", "mount", "net", "poll", "ptrace", "reboot", "resource", "sched", "signal", "term", "time", "user", "zerocopy"] }
43+
pin-project = "0.4.30"
4344
preempt_rwlock = { version = "0.0.0", path = "../preempt_rwlock" }
4445
rand = { version = "0.8", features = ["small_rng"] }
4546
serde = { version = "1.0.185", features = ["derive", "rc"] }

hyperactor_mesh/src/code_sync/rsync.rs

Lines changed: 33 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ use anyhow::Result;
1818
use anyhow::bail;
1919
use anyhow::ensure;
2020
use async_trait::async_trait;
21+
use futures::StreamExt;
22+
use futures::TryStreamExt;
23+
use futures::stream;
2124
use futures::try_join;
2225
use hyperactor::Actor;
2326
use hyperactor::Handler;
@@ -40,7 +43,6 @@ use crate::actor_mesh::ActorMesh;
4043
use crate::code_sync::WorkspaceLocation;
4144
use crate::connect::Connect;
4245
use crate::connect::accept;
43-
use crate::connect::connect_mesh;
4446

4547
pub async fn do_rsync(addr: &SocketAddr, workspace: &Path) -> Result<()> {
4648
let output = Command::new("rsync")
@@ -190,36 +192,45 @@ impl Handler<Connect> for RsyncActor {
190192
) -> Result<(), anyhow::Error> {
191193
let (mut local, mut stream) = try_join!(
192194
async { Ok(TcpStream::connect(self.daemon.addr()).await?) },
193-
async {
194-
let (rd, wr) = accept(cx, message).await?;
195-
anyhow::Ok(tokio::io::join(rd, wr))
196-
}
195+
accept(cx, cx.self_id().clone(), message),
197196
)?;
198197
tokio::io::copy_bidirectional(&mut local, &mut stream).await?;
199198
Ok(())
200199
}
201200
}
202201

203-
pub async fn rsync_mesh<M>(actor_mesh: M, workspace: PathBuf) -> Result<()>
202+
pub async fn rsync_mesh<M>(actor_mesh: &M, workspace: PathBuf) -> Result<()>
204203
where
205204
M: ActorMesh<Actor = RsyncActor>,
206205
{
207-
connect_mesh(actor_mesh, async move |rd, wr| {
208-
let workspace = workspace.clone();
209-
let listener = TcpListener::bind(("::1", 0)).await?;
210-
let addr = listener.local_addr()?;
211-
let mut local = tokio::io::join(rd, wr);
212-
try_join!(
213-
async move { do_rsync(&addr, &workspace).await },
214-
async move {
215-
let (mut stream, _) = listener.accept().await?;
216-
tokio::io::copy_bidirectional(&mut stream, &mut local).await?;
217-
anyhow::Ok(())
218-
},
219-
)?;
220-
anyhow::Ok(())
221-
})
222-
.await
206+
// We avoid casting here as we need point-to-point connections to each individual actor.
207+
stream::iter(actor_mesh.iter_actor_refs())
208+
// Connect to all actors in the mesh.
209+
.map(|actor| async move {
210+
let mailbox = actor_mesh.proc_mesh().client();
211+
let (connect, completer) = Connect::allocate(mailbox.actor_id().clone(), mailbox);
212+
actor.send(mailbox, connect)?;
213+
completer.complete().await
214+
})
215+
// Max the connections run in parallel.
216+
.buffer_unordered(usize::MAX)
217+
// Initiate the rsync, in parallel.
218+
.try_for_each_concurrent(None, |mut local| async {
219+
let workspace = workspace.clone();
220+
let listener = TcpListener::bind(("::1", 0)).await?;
221+
let addr = listener.local_addr()?;
222+
try_join!(
223+
async move { do_rsync(&addr, &workspace).await },
224+
async move {
225+
let (mut stream, _) = listener.accept().await?;
226+
tokio::io::copy_bidirectional(&mut stream, &mut local).await?;
227+
anyhow::Ok(())
228+
},
229+
)?;
230+
anyhow::Ok(())
231+
})
232+
.await?;
233+
Ok(())
223234
}
224235

225236
#[cfg(test)]

0 commit comments

Comments
 (0)