diff --git a/hyperactor_mesh/src/code_sync/rsync.rs b/hyperactor_mesh/src/code_sync/rsync.rs index 585d16d6..bf9a630b 100644 --- a/hyperactor_mesh/src/code_sync/rsync.rs +++ b/hyperactor_mesh/src/code_sync/rsync.rs @@ -15,16 +15,21 @@ use std::time::Duration; use anyhow::Context; use anyhow::Result; +use anyhow::anyhow; use anyhow::bail; use anyhow::ensure; use async_trait::async_trait; use futures::StreamExt; +use futures::TryFutureExt; use futures::TryStreamExt; use futures::stream; use futures::try_join; use hyperactor::Actor; +use hyperactor::Bind; use hyperactor::Handler; use hyperactor::Named; +use hyperactor::OncePortRef; +use hyperactor::Unbind; use hyperactor::clock::Clock; use hyperactor::clock::RealClock; use nix::sys::signal; @@ -56,8 +61,8 @@ pub async fn do_rsync(addr: &SocketAddr, workspace: &Path) -> Result<()> { .arg("--delay-updates") .arg("--exclude=.rsync-tmp.*") .arg(format!("--partial-dir=.rsync-tmp.{}", addr.port())) - .arg(format!("{}/", workspace.display())) .arg(format!("rsync://{}/workspace", addr)) + .arg(format!("{}/", workspace.display())) .stderr(Stdio::piped()) .output() .await?; @@ -89,8 +94,8 @@ impl RsyncDaemon { path = {workspace} use chroot = no list = no - read only = false - write only = true + read only = true + write only = false uid = {uid} hosts allow = localhost "#, @@ -158,18 +163,24 @@ impl RsyncDaemon { } } +#[derive(Debug, Named, Serialize, Deserialize, Bind, Unbind)] +pub struct RsyncMessage { + /// The connect message to create a duplex bytestream with the client. + pub connect: Connect, + /// A port to send back any errors from the rsync. + pub result: OncePortRef>, +} + #[derive(Debug, Named, Serialize, Deserialize)] pub struct RsyncParams { pub workspace: WorkspaceLocation, } #[derive(Debug)] -#[hyperactor::export( - spawn = true, - handlers = [Connect { cast = true }], -)] +#[hyperactor::export(spawn = true, handlers = [RsyncMessage { cast = true }])] pub struct RsyncActor { - daemon: RsyncDaemon, + workspace: WorkspaceLocation, + //daemon: RsyncDaemon, } #[async_trait] @@ -177,24 +188,33 @@ impl Actor for RsyncActor { type Params = RsyncParams; async fn new(RsyncParams { workspace }: Self::Params) -> Result { - let workspace = workspace.resolve()?; - let daemon = RsyncDaemon::spawn(TcpListener::bind(("::1", 0)).await?, &workspace).await?; - Ok(Self { daemon }) + Ok(Self { workspace }) } } #[async_trait] -impl Handler for RsyncActor { +impl Handler for RsyncActor { async fn handle( &mut self, cx: &hyperactor::Context, - message: Connect, + RsyncMessage { connect, result }: RsyncMessage, ) -> Result<(), anyhow::Error> { - let (mut local, mut stream) = try_join!( - async { Ok(TcpStream::connect(self.daemon.addr()).await?) }, - accept(cx, cx.self_id().clone(), message), - )?; - tokio::io::copy_bidirectional(&mut local, &mut stream).await?; + let res = async { + let workspace = self.workspace.resolve()?; + let (listener, mut stream) = try_join!( + TcpListener::bind(("::1", 0)).err_into(), + accept(cx, cx.self_id().clone(), connect), + )?; + let addr = listener.local_addr()?; + try_join!(do_rsync(&addr, &workspace), async move { + let (mut local, _) = listener.accept().await?; + tokio::io::copy_bidirectional(&mut stream, &mut local).await?; + anyhow::Ok(()) + },)?; + anyhow::Ok(()) + } + .await; + result.send(cx, res.map_err(|e| format!("{:#?}", e)))?; Ok(()) } } @@ -203,33 +223,45 @@ pub async fn rsync_mesh(actor_mesh: &M, workspace: PathBuf) -> Result<()> where M: ActorMesh, { + // Spawn a rsync daemon to acceopt incoming connections from actors. + let daemon = RsyncDaemon::spawn(TcpListener::bind(("::1", 0)).await?, &workspace).await?; + let daemon_addr = daemon.addr(); + // We avoid casting here as we need point-to-point connections to each individual actor. stream::iter(actor_mesh.iter_actor_refs()) + .map(anyhow::Ok) // Connect to all actors in the mesh. - .map(|actor| async move { + .try_for_each_concurrent(None, |actor| async move { let mailbox = actor_mesh.proc_mesh().client(); let (connect, completer) = Connect::allocate(mailbox.actor_id().clone(), mailbox); - actor.send(mailbox, connect)?; - completer.complete().await - }) - // Max the connections run in parallel. - .buffer_unordered(usize::MAX) - // Initiate the rsync, in parallel. - .try_for_each_concurrent(None, |mut local| async { - let workspace = workspace.clone(); - let listener = TcpListener::bind(("::1", 0)).await?; - let addr = listener.local_addr()?; - try_join!( - async move { do_rsync(&addr, &workspace).await }, - async move { - let (mut stream, _) = listener.accept().await?; - tokio::io::copy_bidirectional(&mut stream, &mut local).await?; - anyhow::Ok(()) + let (tx, rx) = mailbox.open_once_port::>(); + actor.send( + mailbox, + RsyncMessage { + connect, + result: tx.bind(), }, )?; + let (mut local, mut stream) = try_join!( + TcpStream::connect(daemon_addr.clone()).err_into(), + completer.complete(), + )?; + // Pipe the remote rsync client to the local rsync server, but don't propagate failures yet. + let copy_res = tokio::io::copy_bidirectional(&mut local, &mut stream).await; + // Now wait for the final result to be sent back. We wrap in a timeout, as we should get this + // back pretty quickly after the copy above is done. + let () = RealClock + .timeout(Duration::from_secs(1), rx.recv()) + .await?? + .map_err(|err| anyhow!("failure from {}: {}", actor.actor_id(), err))?; + // Finally, propagate any copy errors, in case there were some but not result error. + let _ = copy_res?; anyhow::Ok(()) }) .await?; + + daemon.shutdown().await?; + Ok(()) } @@ -237,11 +269,16 @@ where mod tests { use anyhow::Result; use anyhow::anyhow; + use ndslice::shape; use tempfile::TempDir; use tokio::fs; use tokio::net::TcpListener; use super::*; + use crate::alloc::AllocSpec; + use crate::alloc::Allocator; + use crate::alloc::local::LocalAllocator; + use crate::proc_mesh::ProcMesh; #[tokio::test] async fn test_simple() -> Result<()> { @@ -259,4 +296,46 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_rsync_actor_and_mesh() -> Result<()> { + // Create source workspace with test files + let source_workspace = TempDir::new()?; + fs::write(source_workspace.path().join("test1.txt"), "content1").await?; + fs::write(source_workspace.path().join("test2.txt"), "content2").await?; + fs::create_dir(source_workspace.path().join("subdir")).await?; + fs::write(source_workspace.path().join("subdir/test3.txt"), "content3").await?; + + // Create target workspace for the actors + let target_workspace = TempDir::new()?; + + // Set up actor mesh with 2 RsyncActors + let alloc = LocalAllocator + .allocate(AllocSpec { + shape: shape! { replica = 1 }, + constraints: Default::default(), + }) + .await?; + + let proc_mesh = ProcMesh::allocate(alloc).await?; + + // Create RsyncParams - all actors will use the same target workspace for this test + let params = RsyncParams { + workspace: WorkspaceLocation::Constant(target_workspace.path().to_path_buf()), + }; + + // Spawn actor mesh with RsyncActors + let actor_mesh = proc_mesh.spawn::("rsync_test", ¶ms).await?; + + // Test rsync_mesh function - this coordinates rsync operations across the mesh + rsync_mesh(&actor_mesh, source_workspace.path().to_path_buf()).await?; + + // Verify we copied correctly. + assert!( + !dir_diff::is_different(&source_workspace, &target_workspace) + .map_err(|e| anyhow!("{:?}", e))? + ); + + Ok(()) + } }