@@ -15,16 +15,21 @@ use std::time::Duration;
15
15
16
16
use anyhow:: Context ;
17
17
use anyhow:: Result ;
18
+ use anyhow:: anyhow;
18
19
use anyhow:: bail;
19
20
use anyhow:: ensure;
20
21
use async_trait:: async_trait;
21
22
use futures:: StreamExt ;
23
+ use futures:: TryFutureExt ;
22
24
use futures:: TryStreamExt ;
23
25
use futures:: stream;
24
26
use futures:: try_join;
25
27
use hyperactor:: Actor ;
28
+ use hyperactor:: Bind ;
26
29
use hyperactor:: Handler ;
27
30
use hyperactor:: Named ;
31
+ use hyperactor:: OncePortRef ;
32
+ use hyperactor:: Unbind ;
28
33
use hyperactor:: clock:: Clock ;
29
34
use hyperactor:: clock:: RealClock ;
30
35
use nix:: sys:: signal;
@@ -56,8 +61,8 @@ pub async fn do_rsync(addr: &SocketAddr, workspace: &Path) -> Result<()> {
56
61
. arg ( "--delay-updates" )
57
62
. arg ( "--exclude=.rsync-tmp.*" )
58
63
. arg ( format ! ( "--partial-dir=.rsync-tmp.{}" , addr. port( ) ) )
59
- . arg ( format ! ( "{}/" , workspace. display( ) ) )
60
64
. arg ( format ! ( "rsync://{}/workspace" , addr) )
65
+ . arg ( format ! ( "{}/" , workspace. display( ) ) )
61
66
. stderr ( Stdio :: piped ( ) )
62
67
. output ( )
63
68
. await ?;
@@ -89,8 +94,8 @@ impl RsyncDaemon {
89
94
path = {workspace}
90
95
use chroot = no
91
96
list = no
92
- read only = false
93
- write only = true
97
+ read only = true
98
+ write only = false
94
99
uid = {uid}
95
100
hosts allow = localhost
96
101
"# ,
@@ -158,43 +163,58 @@ impl RsyncDaemon {
158
163
}
159
164
}
160
165
166
+ #[ derive( Debug , Named , Serialize , Deserialize , Bind , Unbind ) ]
167
+ pub struct RsyncMessage {
168
+ /// The connect message to create a duplex bytestream with the client.
169
+ pub connect : Connect ,
170
+ /// A port to send back any errors from the rsync.
171
+ pub result : OncePortRef < Result < ( ) , String > > ,
172
+ }
173
+
161
174
#[ derive( Debug , Named , Serialize , Deserialize ) ]
162
175
pub struct RsyncParams {
163
176
pub workspace : WorkspaceLocation ,
164
177
}
165
178
166
179
#[ derive( Debug ) ]
167
- #[ hyperactor:: export(
168
- spawn = true ,
169
- handlers = [ Connect { cast = true } ] ,
170
- ) ]
180
+ #[ hyperactor:: export( spawn = true , handlers = [ RsyncMessage { cast = true } ] ) ]
171
181
pub struct RsyncActor {
172
- daemon : RsyncDaemon ,
182
+ workspace : WorkspaceLocation ,
183
+ //daemon: RsyncDaemon,
173
184
}
174
185
175
186
#[ async_trait]
176
187
impl Actor for RsyncActor {
177
188
type Params = RsyncParams ;
178
189
179
190
async fn new ( RsyncParams { workspace } : Self :: Params ) -> Result < Self > {
180
- let workspace = workspace. resolve ( ) ?;
181
- let daemon = RsyncDaemon :: spawn ( TcpListener :: bind ( ( "::1" , 0 ) ) . await ?, & workspace) . await ?;
182
- Ok ( Self { daemon } )
191
+ Ok ( Self { workspace } )
183
192
}
184
193
}
185
194
186
195
#[ async_trait]
187
- impl Handler < Connect > for RsyncActor {
196
+ impl Handler < RsyncMessage > for RsyncActor {
188
197
async fn handle (
189
198
& mut self ,
190
199
cx : & hyperactor:: Context < Self > ,
191
- message : Connect ,
200
+ RsyncMessage { connect , result } : RsyncMessage ,
192
201
) -> Result < ( ) , anyhow:: Error > {
193
- let ( mut local, mut stream) = try_join ! (
194
- async { Ok ( TcpStream :: connect( self . daemon. addr( ) ) . await ?) } ,
195
- accept( cx, cx. self_id( ) . clone( ) , message) ,
196
- ) ?;
197
- tokio:: io:: copy_bidirectional ( & mut local, & mut stream) . await ?;
202
+ let res = async {
203
+ let workspace = self . workspace . resolve ( ) ?;
204
+ let ( listener, mut stream) = try_join ! (
205
+ TcpListener :: bind( ( "::1" , 0 ) ) . err_into( ) ,
206
+ accept( cx, cx. self_id( ) . clone( ) , connect) ,
207
+ ) ?;
208
+ let addr = listener. local_addr ( ) ?;
209
+ try_join ! ( do_rsync( & addr, & workspace) , async move {
210
+ let ( mut local, _) = listener. accept( ) . await ?;
211
+ tokio:: io:: copy_bidirectional( & mut stream, & mut local) . await ?;
212
+ anyhow:: Ok ( ( ) )
213
+ } , ) ?;
214
+ anyhow:: Ok ( ( ) )
215
+ }
216
+ . await ;
217
+ result. send ( cx, res. map_err ( |e| format ! ( "{:#?}" , e) ) ) ?;
198
218
Ok ( ( ) )
199
219
}
200
220
}
@@ -203,45 +223,62 @@ pub async fn rsync_mesh<M>(actor_mesh: &M, workspace: PathBuf) -> Result<()>
203
223
where
204
224
M : ActorMesh < Actor = RsyncActor > ,
205
225
{
226
+ // Spawn a rsync daemon to acceopt incoming connections from actors.
227
+ let daemon = RsyncDaemon :: spawn ( TcpListener :: bind ( ( "::1" , 0 ) ) . await ?, & workspace) . await ?;
228
+ let daemon_addr = daemon. addr ( ) ;
229
+
206
230
// We avoid casting here as we need point-to-point connections to each individual actor.
207
231
stream:: iter ( actor_mesh. iter_actor_refs ( ) )
232
+ . map ( anyhow:: Ok )
208
233
// Connect to all actors in the mesh.
209
- . map ( |actor| async move {
234
+ . try_for_each_concurrent ( None , |actor| async move {
210
235
let mailbox = actor_mesh. proc_mesh ( ) . client ( ) ;
211
236
let ( connect, completer) = Connect :: allocate ( mailbox. actor_id ( ) . clone ( ) , mailbox) ;
212
- actor. send ( mailbox, connect) ?;
213
- completer. complete ( ) . await
214
- } )
215
- // Max the connections run in parallel.
216
- . buffer_unordered ( usize:: MAX )
217
- // Initiate the rsync, in parallel.
218
- . try_for_each_concurrent ( None , |mut local| async {
219
- let workspace = workspace. clone ( ) ;
220
- let listener = TcpListener :: bind ( ( "::1" , 0 ) ) . await ?;
221
- let addr = listener. local_addr ( ) ?;
222
- try_join ! (
223
- async move { do_rsync( & addr, & workspace) . await } ,
224
- async move {
225
- let ( mut stream, _) = listener. accept( ) . await ?;
226
- tokio:: io:: copy_bidirectional( & mut stream, & mut local) . await ?;
227
- anyhow:: Ok ( ( ) )
237
+ let ( tx, rx) = mailbox. open_once_port :: < Result < ( ) , String > > ( ) ;
238
+ actor. send (
239
+ mailbox,
240
+ RsyncMessage {
241
+ connect,
242
+ result : tx. bind ( ) ,
228
243
} ,
229
244
) ?;
245
+ let ( mut local, mut stream) = try_join ! (
246
+ TcpStream :: connect( daemon_addr. clone( ) ) . err_into( ) ,
247
+ completer. complete( ) ,
248
+ ) ?;
249
+ // Pipe the remote rsync client to the local rsync server, but don't propagate failures yet.
250
+ let copy_res = tokio:: io:: copy_bidirectional ( & mut local, & mut stream) . await ;
251
+ // Now wait for the final result to be sent back. We wrap in a timeout, as we should get this
252
+ // back pretty quickly after the copy above is done.
253
+ let ( ) = RealClock
254
+ . timeout ( Duration :: from_secs ( 1 ) , rx. recv ( ) )
255
+ . await ??
256
+ . map_err ( |err| anyhow ! ( "failure from {}: {}" , actor. actor_id( ) , err) ) ?;
257
+ // Finally, propagate any copy errors, in case there were some but not result error.
258
+ let _ = copy_res?;
230
259
anyhow:: Ok ( ( ) )
231
260
} )
232
261
. await ?;
262
+
263
+ daemon. shutdown ( ) . await ?;
264
+
233
265
Ok ( ( ) )
234
266
}
235
267
236
268
#[ cfg( test) ]
237
269
mod tests {
238
270
use anyhow:: Result ;
239
271
use anyhow:: anyhow;
272
+ use ndslice:: shape;
240
273
use tempfile:: TempDir ;
241
274
use tokio:: fs;
242
275
use tokio:: net:: TcpListener ;
243
276
244
277
use super :: * ;
278
+ use crate :: alloc:: AllocSpec ;
279
+ use crate :: alloc:: Allocator ;
280
+ use crate :: alloc:: local:: LocalAllocator ;
281
+ use crate :: proc_mesh:: ProcMesh ;
245
282
246
283
#[ tokio:: test]
247
284
async fn test_simple ( ) -> Result < ( ) > {
@@ -259,4 +296,46 @@ mod tests {
259
296
260
297
Ok ( ( ) )
261
298
}
299
+
300
+ #[ tokio:: test]
301
+ async fn test_rsync_actor_and_mesh ( ) -> Result < ( ) > {
302
+ // Create source workspace with test files
303
+ let source_workspace = TempDir :: new ( ) ?;
304
+ fs:: write ( source_workspace. path ( ) . join ( "test1.txt" ) , "content1" ) . await ?;
305
+ fs:: write ( source_workspace. path ( ) . join ( "test2.txt" ) , "content2" ) . await ?;
306
+ fs:: create_dir ( source_workspace. path ( ) . join ( "subdir" ) ) . await ?;
307
+ fs:: write ( source_workspace. path ( ) . join ( "subdir/test3.txt" ) , "content3" ) . await ?;
308
+
309
+ // Create target workspace for the actors
310
+ let target_workspace = TempDir :: new ( ) ?;
311
+
312
+ // Set up actor mesh with 2 RsyncActors
313
+ let alloc = LocalAllocator
314
+ . allocate ( AllocSpec {
315
+ shape : shape ! { replica = 1 } ,
316
+ constraints : Default :: default ( ) ,
317
+ } )
318
+ . await ?;
319
+
320
+ let proc_mesh = ProcMesh :: allocate ( alloc) . await ?;
321
+
322
+ // Create RsyncParams - all actors will use the same target workspace for this test
323
+ let params = RsyncParams {
324
+ workspace : WorkspaceLocation :: Constant ( target_workspace. path ( ) . to_path_buf ( ) ) ,
325
+ } ;
326
+
327
+ // Spawn actor mesh with RsyncActors
328
+ let actor_mesh = proc_mesh. spawn :: < RsyncActor > ( "rsync_test" , & params) . await ?;
329
+
330
+ // Test rsync_mesh function - this coordinates rsync operations across the mesh
331
+ rsync_mesh ( & actor_mesh, source_workspace. path ( ) . to_path_buf ( ) ) . await ?;
332
+
333
+ // Verify we copied correctly.
334
+ assert ! (
335
+ !dir_diff:: is_different( & source_workspace, & target_workspace)
336
+ . map_err( |e| anyhow!( "{:?}" , e) ) ?
337
+ ) ;
338
+
339
+ Ok ( ( ) )
340
+ }
262
341
}
0 commit comments