Skip to content

Invert rsync architecture #467

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 114 additions & 35 deletions hyperactor_mesh/src/code_sync/rsync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,21 @@ use std::time::Duration;

use anyhow::Context;
use anyhow::Result;
use anyhow::anyhow;
use anyhow::bail;
use anyhow::ensure;
use async_trait::async_trait;
use futures::StreamExt;
use futures::TryFutureExt;
use futures::TryStreamExt;
use futures::stream;
use futures::try_join;
use hyperactor::Actor;
use hyperactor::Bind;
use hyperactor::Handler;
use hyperactor::Named;
use hyperactor::OncePortRef;
use hyperactor::Unbind;
use hyperactor::clock::Clock;
use hyperactor::clock::RealClock;
use nix::sys::signal;
Expand Down Expand Up @@ -56,8 +61,8 @@ pub async fn do_rsync(addr: &SocketAddr, workspace: &Path) -> Result<()> {
.arg("--delay-updates")
.arg("--exclude=.rsync-tmp.*")
.arg(format!("--partial-dir=.rsync-tmp.{}", addr.port()))
.arg(format!("{}/", workspace.display()))
.arg(format!("rsync://{}/workspace", addr))
.arg(format!("{}/", workspace.display()))
.stderr(Stdio::piped())
.output()
.await?;
Expand Down Expand Up @@ -89,8 +94,8 @@ impl RsyncDaemon {
path = {workspace}
use chroot = no
list = no
read only = false
write only = true
read only = true
write only = false
uid = {uid}
hosts allow = localhost
"#,
Expand Down Expand Up @@ -158,43 +163,58 @@ impl RsyncDaemon {
}
}

#[derive(Debug, Named, Serialize, Deserialize, Bind, Unbind)]
pub struct RsyncMessage {
/// The connect message to create a duplex bytestream with the client.
pub connect: Connect,
/// A port to send back any errors from the rsync.
pub result: OncePortRef<Result<(), String>>,
}

#[derive(Debug, Named, Serialize, Deserialize)]
pub struct RsyncParams {
pub workspace: WorkspaceLocation,
}

#[derive(Debug)]
#[hyperactor::export(
spawn = true,
handlers = [Connect { cast = true }],
)]
#[hyperactor::export(spawn = true, handlers = [RsyncMessage { cast = true }])]
pub struct RsyncActor {
daemon: RsyncDaemon,
workspace: WorkspaceLocation,
//daemon: RsyncDaemon,
}

#[async_trait]
impl Actor for RsyncActor {
type Params = RsyncParams;

async fn new(RsyncParams { workspace }: Self::Params) -> Result<Self> {
let workspace = workspace.resolve()?;
let daemon = RsyncDaemon::spawn(TcpListener::bind(("::1", 0)).await?, &workspace).await?;
Ok(Self { daemon })
Ok(Self { workspace })
}
}

#[async_trait]
impl Handler<Connect> for RsyncActor {
impl Handler<RsyncMessage> for RsyncActor {
async fn handle(
&mut self,
cx: &hyperactor::Context<Self>,
message: Connect,
RsyncMessage { connect, result }: RsyncMessage,
) -> Result<(), anyhow::Error> {
let (mut local, mut stream) = try_join!(
async { Ok(TcpStream::connect(self.daemon.addr()).await?) },
accept(cx, cx.self_id().clone(), message),
)?;
tokio::io::copy_bidirectional(&mut local, &mut stream).await?;
let res = async {
let workspace = self.workspace.resolve()?;
let (listener, mut stream) = try_join!(
TcpListener::bind(("::1", 0)).err_into(),
accept(cx, cx.self_id().clone(), connect),
)?;
let addr = listener.local_addr()?;
try_join!(do_rsync(&addr, &workspace), async move {
let (mut local, _) = listener.accept().await?;
tokio::io::copy_bidirectional(&mut stream, &mut local).await?;
anyhow::Ok(())
},)?;
anyhow::Ok(())
}
.await;
result.send(cx, res.map_err(|e| format!("{:#?}", e)))?;
Ok(())
}
}
Expand All @@ -203,45 +223,62 @@ pub async fn rsync_mesh<M>(actor_mesh: &M, workspace: PathBuf) -> Result<()>
where
M: ActorMesh<Actor = RsyncActor>,
{
// Spawn a rsync daemon to acceopt incoming connections from actors.
let daemon = RsyncDaemon::spawn(TcpListener::bind(("::1", 0)).await?, &workspace).await?;
let daemon_addr = daemon.addr();

// We avoid casting here as we need point-to-point connections to each individual actor.
stream::iter(actor_mesh.iter_actor_refs())
.map(anyhow::Ok)
// Connect to all actors in the mesh.
.map(|actor| async move {
.try_for_each_concurrent(None, |actor| async move {
let mailbox = actor_mesh.proc_mesh().client();
let (connect, completer) = Connect::allocate(mailbox.actor_id().clone(), mailbox);
actor.send(mailbox, connect)?;
completer.complete().await
})
// Max the connections run in parallel.
.buffer_unordered(usize::MAX)
// Initiate the rsync, in parallel.
.try_for_each_concurrent(None, |mut local| async {
let workspace = workspace.clone();
let listener = TcpListener::bind(("::1", 0)).await?;
let addr = listener.local_addr()?;
try_join!(
async move { do_rsync(&addr, &workspace).await },
async move {
let (mut stream, _) = listener.accept().await?;
tokio::io::copy_bidirectional(&mut stream, &mut local).await?;
anyhow::Ok(())
let (tx, rx) = mailbox.open_once_port::<Result<(), String>>();
actor.send(
mailbox,
RsyncMessage {
connect,
result: tx.bind(),
},
)?;
let (mut local, mut stream) = try_join!(
TcpStream::connect(daemon_addr.clone()).err_into(),
completer.complete(),
)?;
// Pipe the remote rsync client to the local rsync server, but don't propagate failures yet.
let copy_res = tokio::io::copy_bidirectional(&mut local, &mut stream).await;
// Now wait for the final result to be sent back. We wrap in a timeout, as we should get this
// back pretty quickly after the copy above is done.
let () = RealClock
.timeout(Duration::from_secs(1), rx.recv())
.await??
.map_err(|err| anyhow!("failure from {}: {}", actor.actor_id(), err))?;
// Finally, propagate any copy errors, in case there were some but not result error.
let _ = copy_res?;
anyhow::Ok(())
})
.await?;

daemon.shutdown().await?;

Ok(())
}

#[cfg(test)]
mod tests {
use anyhow::Result;
use anyhow::anyhow;
use ndslice::shape;
use tempfile::TempDir;
use tokio::fs;
use tokio::net::TcpListener;

use super::*;
use crate::alloc::AllocSpec;
use crate::alloc::Allocator;
use crate::alloc::local::LocalAllocator;
use crate::proc_mesh::ProcMesh;

#[tokio::test]
async fn test_simple() -> Result<()> {
Expand All @@ -259,4 +296,46 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn test_rsync_actor_and_mesh() -> Result<()> {
// Create source workspace with test files
let source_workspace = TempDir::new()?;
fs::write(source_workspace.path().join("test1.txt"), "content1").await?;
fs::write(source_workspace.path().join("test2.txt"), "content2").await?;
fs::create_dir(source_workspace.path().join("subdir")).await?;
fs::write(source_workspace.path().join("subdir/test3.txt"), "content3").await?;

// Create target workspace for the actors
let target_workspace = TempDir::new()?;

// Set up actor mesh with 2 RsyncActors
let alloc = LocalAllocator
.allocate(AllocSpec {
shape: shape! { replica = 1 },
constraints: Default::default(),
})
.await?;

let proc_mesh = ProcMesh::allocate(alloc).await?;

// Create RsyncParams - all actors will use the same target workspace for this test
let params = RsyncParams {
workspace: WorkspaceLocation::Constant(target_workspace.path().to_path_buf()),
};

// Spawn actor mesh with RsyncActors
let actor_mesh = proc_mesh.spawn::<RsyncActor>("rsync_test", &params).await?;

// Test rsync_mesh function - this coordinates rsync operations across the mesh
rsync_mesh(&actor_mesh, source_workspace.path().to_path_buf()).await?;

// Verify we copied correctly.
assert!(
!dir_diff::is_different(&source_workspace, &target_workspace)
.map_err(|e| anyhow!("{:?}", e))?
);

Ok(())
}
}
Loading