Skip to content

Commit f44d99f

Browse files
andrewjcgfacebook-github-bot
authored andcommitted
Invert rsync architecture
Summary: This inverts the rsync setup, so that the rsync daemon runs on the "client", and the rsync clients run on the actors. This helps in a couple ways: - We don't leave a rsync daemon running on the actors. - We avoid spawning lots of sub-processes for each rsync client on the "client" - The rsync client supports reporting file changes, which subsequent changes can use to facilitate things like targeted module reloading (based on the actual things that were changed). Differential Revision: D77952087
1 parent 1e06a7c commit f44d99f

File tree

1 file changed

+117
-35
lines changed

1 file changed

+117
-35
lines changed

hyperactor_mesh/src/code_sync/rsync.rs

Lines changed: 117 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,16 @@ use anyhow::bail;
1919
use anyhow::ensure;
2020
use async_trait::async_trait;
2121
use futures::StreamExt;
22+
use futures::TryFutureExt;
2223
use futures::TryStreamExt;
2324
use futures::stream;
2425
use futures::try_join;
2526
use hyperactor::Actor;
27+
use hyperactor::Bind;
2628
use hyperactor::Handler;
2729
use hyperactor::Named;
30+
use hyperactor::OncePortRef;
31+
use hyperactor::Unbind;
2832
use hyperactor::clock::Clock;
2933
use hyperactor::clock::RealClock;
3034
use nix::sys::signal;
@@ -56,8 +60,8 @@ pub async fn do_rsync(addr: &SocketAddr, workspace: &Path) -> Result<()> {
5660
.arg("--delay-updates")
5761
.arg("--exclude=.rsync-tmp.*")
5862
.arg(format!("--partial-dir=.rsync-tmp.{}", addr.port()))
59-
.arg(format!("{}/", workspace.display()))
6063
.arg(format!("rsync://{}/workspace", addr))
64+
.arg(format!("{}/", workspace.display()))
6165
.stderr(Stdio::piped())
6266
.output()
6367
.await?;
@@ -89,8 +93,8 @@ impl RsyncDaemon {
8993
path = {workspace}
9094
use chroot = no
9195
list = no
92-
read only = false
93-
write only = true
96+
read only = true
97+
write only = false
9498
uid = {uid}
9599
hosts allow = localhost
96100
"#,
@@ -158,43 +162,61 @@ impl RsyncDaemon {
158162
}
159163
}
160164

165+
#[derive(Debug, Named, Serialize, Deserialize, Bind, Unbind)]
166+
pub struct RsyncMessage {
167+
/// The connect message to create a duplex bytestream with the client.
168+
pub connect: Connect,
169+
/// A port to send back any errors from the rsync.
170+
pub result: OncePortRef<Result<(), String>>,
171+
}
172+
161173
#[derive(Debug, Named, Serialize, Deserialize)]
162174
pub struct RsyncParams {
163175
pub workspace: WorkspaceLocation,
164176
}
165177

166178
#[derive(Debug)]
167-
#[hyperactor::export(
168-
spawn = true,
169-
handlers = [Connect { cast = true }],
170-
)]
179+
#[hyperactor::export(spawn = true, handlers = [RsyncMessage { cast = true }])]
171180
pub struct RsyncActor {
172-
daemon: RsyncDaemon,
181+
workspace: WorkspaceLocation,
182+
//daemon: RsyncDaemon,
173183
}
174184

175185
#[async_trait]
176186
impl Actor for RsyncActor {
177187
type Params = RsyncParams;
178188

179189
async fn new(RsyncParams { workspace }: Self::Params) -> Result<Self> {
180-
let workspace = workspace.resolve()?;
181-
let daemon = RsyncDaemon::spawn(TcpListener::bind(("::1", 0)).await?, &workspace).await?;
182-
Ok(Self { daemon })
190+
Ok(Self { workspace })
183191
}
184192
}
185193

186194
#[async_trait]
187-
impl Handler<Connect> for RsyncActor {
195+
impl Handler<RsyncMessage> for RsyncActor {
188196
async fn handle(
189197
&mut self,
190198
cx: &hyperactor::Context<Self>,
191-
message: Connect,
199+
RsyncMessage { connect, result }: RsyncMessage,
192200
) -> Result<(), anyhow::Error> {
193-
let (mut local, mut stream) = try_join!(
194-
async { Ok(TcpStream::connect(self.daemon.addr()).await?) },
195-
accept(cx, cx.self_id().clone(), message),
196-
)?;
197-
tokio::io::copy_bidirectional(&mut local, &mut stream).await?;
201+
let res = async {
202+
let workspace = self.workspace.resolve()?;
203+
let (listener, mut stream) = try_join!(
204+
async { anyhow::Ok(TcpListener::bind(("::1", 0)).await?) },
205+
accept(cx, cx.self_id().clone(), connect),
206+
)?;
207+
let addr = listener.local_addr()?;
208+
try_join!(
209+
async move { do_rsync(&addr, &workspace).await },
210+
async move {
211+
let mut local = listener.accept().await?.0;
212+
tokio::io::copy_bidirectional(&mut stream, &mut local).await?;
213+
anyhow::Ok(())
214+
},
215+
)?;
216+
anyhow::Ok(())
217+
}
218+
.await;
219+
result.send(cx, res.map_err(|e| format!("{:#?}", e)))?;
198220
Ok(())
199221
}
200222
}
@@ -203,45 +225,63 @@ pub async fn rsync_mesh<M>(actor_mesh: &M, workspace: PathBuf) -> Result<()>
203225
where
204226
M: ActorMesh<Actor = RsyncActor>,
205227
{
228+
// Spawn a rsync daemon to acceopt incoming connections from actors.
229+
let daemon = RsyncDaemon::spawn(TcpListener::bind(("::1", 0)).await?, &workspace).await?;
230+
let daemon_addr = daemon.addr();
231+
206232
// We avoid casting here as we need point-to-point connections to each individual actor.
207233
stream::iter(actor_mesh.iter_actor_refs())
234+
.map(anyhow::Ok)
208235
// Connect to all actors in the mesh.
209-
.map(|actor| async move {
236+
.try_for_each_concurrent(None, |actor| async move {
210237
let mailbox = actor_mesh.proc_mesh().client();
211238
let (connect, completer) = Connect::allocate(mailbox.actor_id().clone(), mailbox);
212-
actor.send(mailbox, connect)?;
213-
completer.complete().await
214-
})
215-
// Max the connections run in parallel.
216-
.buffer_unordered(usize::MAX)
217-
// Initiate the rsync, in parallel.
218-
.try_for_each_concurrent(None, |mut local| async {
219-
let workspace = workspace.clone();
220-
let listener = TcpListener::bind(("::1", 0)).await?;
221-
let addr = listener.local_addr()?;
222-
try_join!(
223-
async move { do_rsync(&addr, &workspace).await },
224-
async move {
225-
let (mut stream, _) = listener.accept().await?;
226-
tokio::io::copy_bidirectional(&mut stream, &mut local).await?;
227-
anyhow::Ok(())
239+
let (tx, rx) = mailbox.open_once_port::<Result<(), String>>();
240+
actor.send(
241+
mailbox,
242+
RsyncMessage {
243+
connect,
244+
result: tx.bind(),
228245
},
229246
)?;
247+
let (mut local, mut stream) = try_join!(
248+
TcpStream::connect(daemon_addr.clone()).map_err(anyhow::Error::msg),
249+
completer.complete(),
250+
)?;
251+
// Pipe the remote rsync client to the local rsync server, but don't propagate failures yet.
252+
let copy_res = tokio::io::copy_bidirectional(&mut local, &mut stream).await;
253+
// Now wait for the final result to be sent back. We wrap in a timeout, as we should get this
254+
// back pretty quickly after the copy above is done.
255+
let () = RealClock
256+
.timeout(Duration::from_secs(1), rx.recv())
257+
.await??
258+
.map_err(anyhow::Error::msg)
259+
.with_context(|| format!("actor: {}", actor.actor_id()))?;
260+
// Finally, propagate any copy errors, in case there were some but not result error.
261+
let _ = copy_res?;
230262
anyhow::Ok(())
231263
})
232264
.await?;
265+
266+
daemon.shutdown().await?;
267+
233268
Ok(())
234269
}
235270

236271
#[cfg(test)]
237272
mod tests {
238273
use anyhow::Result;
239274
use anyhow::anyhow;
275+
use ndslice::shape;
240276
use tempfile::TempDir;
241277
use tokio::fs;
242278
use tokio::net::TcpListener;
243279

244280
use super::*;
281+
use crate::alloc::AllocSpec;
282+
use crate::alloc::Allocator;
283+
use crate::alloc::local::LocalAllocator;
284+
use crate::proc_mesh::ProcMesh;
245285

246286
#[tokio::test]
247287
async fn test_simple() -> Result<()> {
@@ -259,4 +299,46 @@ mod tests {
259299

260300
Ok(())
261301
}
302+
303+
#[tokio::test]
304+
async fn test_rsync_actor_and_mesh() -> Result<()> {
305+
// Create source workspace with test files
306+
let source_workspace = TempDir::new()?;
307+
fs::write(source_workspace.path().join("test1.txt"), "content1").await?;
308+
fs::write(source_workspace.path().join("test2.txt"), "content2").await?;
309+
fs::create_dir(source_workspace.path().join("subdir")).await?;
310+
fs::write(source_workspace.path().join("subdir/test3.txt"), "content3").await?;
311+
312+
// Create target workspace for the actors
313+
let target_workspace = TempDir::new()?;
314+
315+
// Set up actor mesh with 2 RsyncActors
316+
let alloc = LocalAllocator
317+
.allocate(AllocSpec {
318+
shape: shape! { replica = 1 },
319+
constraints: Default::default(),
320+
})
321+
.await?;
322+
323+
let proc_mesh = ProcMesh::allocate(alloc).await?;
324+
325+
// Create RsyncParams - all actors will use the same target workspace for this test
326+
let params = RsyncParams {
327+
workspace: WorkspaceLocation::Constant(target_workspace.path().to_path_buf()),
328+
};
329+
330+
// Spawn actor mesh with RsyncActors
331+
let actor_mesh = proc_mesh.spawn::<RsyncActor>("rsync_test", &params).await?;
332+
333+
// Test rsync_mesh function - this coordinates rsync operations across the mesh
334+
rsync_mesh(&actor_mesh, source_workspace.path().to_path_buf()).await?;
335+
336+
// Verify we copied correctly.
337+
assert!(
338+
!dir_diff::is_different(&source_workspace, &target_workspace)
339+
.map_err(|e| anyhow!("{:?}", e))?
340+
);
341+
342+
Ok(())
343+
}
262344
}

0 commit comments

Comments
 (0)