@@ -19,12 +19,16 @@ use anyhow::bail;
19
19
use anyhow:: ensure;
20
20
use async_trait:: async_trait;
21
21
use futures:: StreamExt ;
22
+ use futures:: TryFutureExt ;
22
23
use futures:: TryStreamExt ;
23
24
use futures:: stream;
24
25
use futures:: try_join;
25
26
use hyperactor:: Actor ;
27
+ use hyperactor:: Bind ;
26
28
use hyperactor:: Handler ;
27
29
use hyperactor:: Named ;
30
+ use hyperactor:: OncePortRef ;
31
+ use hyperactor:: Unbind ;
28
32
use hyperactor:: clock:: Clock ;
29
33
use hyperactor:: clock:: RealClock ;
30
34
use nix:: sys:: signal;
@@ -56,8 +60,8 @@ pub async fn do_rsync(addr: &SocketAddr, workspace: &Path) -> Result<()> {
56
60
. arg ( "--delay-updates" )
57
61
. arg ( "--exclude=.rsync-tmp.*" )
58
62
. arg ( format ! ( "--partial-dir=.rsync-tmp.{}" , addr. port( ) ) )
59
- . arg ( format ! ( "{}/" , workspace. display( ) ) )
60
63
. arg ( format ! ( "rsync://{}/workspace" , addr) )
64
+ . arg ( format ! ( "{}/" , workspace. display( ) ) )
61
65
. stderr ( Stdio :: piped ( ) )
62
66
. output ( )
63
67
. await ?;
@@ -89,8 +93,8 @@ impl RsyncDaemon {
89
93
path = {workspace}
90
94
use chroot = no
91
95
list = no
92
- read only = false
93
- write only = true
96
+ read only = true
97
+ write only = false
94
98
uid = {uid}
95
99
hosts allow = localhost
96
100
"# ,
@@ -158,43 +162,58 @@ impl RsyncDaemon {
158
162
}
159
163
}
160
164
165
+ #[ derive( Debug , Named , Serialize , Deserialize , Bind , Unbind ) ]
166
+ pub struct RsyncMessage {
167
+ /// The connect message to create a duplex bytestream with the client.
168
+ pub connect : Connect ,
169
+ /// A port to send back any errors from the rsync.
170
+ pub result : OncePortRef < Result < ( ) , String > > ,
171
+ }
172
+
161
173
#[ derive( Debug , Named , Serialize , Deserialize ) ]
162
174
pub struct RsyncParams {
163
175
pub workspace : WorkspaceLocation ,
164
176
}
165
177
166
178
#[ derive( Debug ) ]
167
- #[ hyperactor:: export(
168
- spawn = true ,
169
- handlers = [ Connect { cast = true } ] ,
170
- ) ]
179
+ #[ hyperactor:: export( spawn = true , handlers = [ RsyncMessage { cast = true } ] ) ]
171
180
pub struct RsyncActor {
172
- daemon : RsyncDaemon ,
181
+ workspace : WorkspaceLocation ,
182
+ //daemon: RsyncDaemon,
173
183
}
174
184
175
185
#[ async_trait]
176
186
impl Actor for RsyncActor {
177
187
type Params = RsyncParams ;
178
188
179
189
async fn new ( RsyncParams { workspace } : Self :: Params ) -> Result < Self > {
180
- let workspace = workspace. resolve ( ) ?;
181
- let daemon = RsyncDaemon :: spawn ( TcpListener :: bind ( ( "::1" , 0 ) ) . await ?, & workspace) . await ?;
182
- Ok ( Self { daemon } )
190
+ Ok ( Self { workspace } )
183
191
}
184
192
}
185
193
186
194
#[ async_trait]
187
- impl Handler < Connect > for RsyncActor {
195
+ impl Handler < RsyncMessage > for RsyncActor {
188
196
async fn handle (
189
197
& mut self ,
190
198
cx : & hyperactor:: Context < Self > ,
191
- message : Connect ,
199
+ RsyncMessage { connect , result } : RsyncMessage ,
192
200
) -> Result < ( ) , anyhow:: Error > {
193
- let ( mut local, mut stream) = try_join ! (
194
- async { Ok ( TcpStream :: connect( self . daemon. addr( ) ) . await ?) } ,
195
- accept( cx, cx. self_id( ) . clone( ) , message) ,
196
- ) ?;
197
- tokio:: io:: copy_bidirectional ( & mut local, & mut stream) . await ?;
201
+ let res = async {
202
+ let workspace = self . workspace . resolve ( ) ?;
203
+ let ( listener, mut stream) = try_join ! (
204
+ TcpListener :: bind( ( "::1" , 0 ) ) . err_into( ) ,
205
+ accept( cx, cx. self_id( ) . clone( ) , connect) ,
206
+ ) ?;
207
+ let addr = listener. local_addr ( ) ?;
208
+ try_join ! ( do_rsync( & addr, & workspace) , async move {
209
+ let ( mut local, _) = listener. accept( ) . await ?;
210
+ tokio:: io:: copy_bidirectional( & mut stream, & mut local) . await ?;
211
+ anyhow:: Ok ( ( ) )
212
+ } , ) ?;
213
+ anyhow:: Ok ( ( ) )
214
+ }
215
+ . await ;
216
+ result. send ( cx, res. map_err ( |e| format ! ( "{:#?}" , e) ) ) ?;
198
217
Ok ( ( ) )
199
218
}
200
219
}
@@ -203,45 +222,63 @@ pub async fn rsync_mesh<M>(actor_mesh: &M, workspace: PathBuf) -> Result<()>
203
222
where
204
223
M : ActorMesh < Actor = RsyncActor > ,
205
224
{
225
+ // Spawn a rsync daemon to acceopt incoming connections from actors.
226
+ let daemon = RsyncDaemon :: spawn ( TcpListener :: bind ( ( "::1" , 0 ) ) . await ?, & workspace) . await ?;
227
+ let daemon_addr = daemon. addr ( ) ;
228
+
206
229
// We avoid casting here as we need point-to-point connections to each individual actor.
207
230
stream:: iter ( actor_mesh. iter_actor_refs ( ) )
231
+ . map ( anyhow:: Ok )
208
232
// Connect to all actors in the mesh.
209
- . map ( |actor| async move {
233
+ . try_for_each_concurrent ( None , |actor| async move {
210
234
let mailbox = actor_mesh. proc_mesh ( ) . client ( ) ;
211
235
let ( connect, completer) = Connect :: allocate ( mailbox. actor_id ( ) . clone ( ) , mailbox) ;
212
- actor. send ( mailbox, connect) ?;
213
- completer. complete ( ) . await
214
- } )
215
- // Max the connections run in parallel.
216
- . buffer_unordered ( usize:: MAX )
217
- // Initiate the rsync, in parallel.
218
- . try_for_each_concurrent ( None , |mut local| async {
219
- let workspace = workspace. clone ( ) ;
220
- let listener = TcpListener :: bind ( ( "::1" , 0 ) ) . await ?;
221
- let addr = listener. local_addr ( ) ?;
222
- try_join ! (
223
- async move { do_rsync( & addr, & workspace) . await } ,
224
- async move {
225
- let ( mut stream, _) = listener. accept( ) . await ?;
226
- tokio:: io:: copy_bidirectional( & mut stream, & mut local) . await ?;
227
- anyhow:: Ok ( ( ) )
236
+ let ( tx, rx) = mailbox. open_once_port :: < Result < ( ) , String > > ( ) ;
237
+ actor. send (
238
+ mailbox,
239
+ RsyncMessage {
240
+ connect,
241
+ result : tx. bind ( ) ,
228
242
} ,
229
243
) ?;
244
+ let ( mut local, mut stream) = try_join ! (
245
+ TcpStream :: connect( daemon_addr. clone( ) ) . err_into( ) ,
246
+ completer. complete( ) ,
247
+ ) ?;
248
+ // Pipe the remote rsync client to the local rsync server, but don't propagate failures yet.
249
+ let copy_res = tokio:: io:: copy_bidirectional ( & mut local, & mut stream) . await ;
250
+ // Now wait for the final result to be sent back. We wrap in a timeout, as we should get this
251
+ // back pretty quickly after the copy above is done.
252
+ let ( ) = RealClock
253
+ . timeout ( Duration :: from_secs ( 1 ) , rx. recv ( ) )
254
+ . await ??
255
+ . map_err ( anyhow:: Error :: msg)
256
+ . with_context ( || format ! ( "actor: {}" , actor. actor_id( ) ) ) ?;
257
+ // Finally, propagate any copy errors, in case there were some but not result error.
258
+ let _ = copy_res?;
230
259
anyhow:: Ok ( ( ) )
231
260
} )
232
261
. await ?;
262
+
263
+ daemon. shutdown ( ) . await ?;
264
+
233
265
Ok ( ( ) )
234
266
}
235
267
236
268
#[ cfg( test) ]
237
269
mod tests {
238
270
use anyhow:: Result ;
239
271
use anyhow:: anyhow;
272
+ use ndslice:: shape;
240
273
use tempfile:: TempDir ;
241
274
use tokio:: fs;
242
275
use tokio:: net:: TcpListener ;
243
276
244
277
use super :: * ;
278
+ use crate :: alloc:: AllocSpec ;
279
+ use crate :: alloc:: Allocator ;
280
+ use crate :: alloc:: local:: LocalAllocator ;
281
+ use crate :: proc_mesh:: ProcMesh ;
245
282
246
283
#[ tokio:: test]
247
284
async fn test_simple ( ) -> Result < ( ) > {
@@ -259,4 +296,46 @@ mod tests {
259
296
260
297
Ok ( ( ) )
261
298
}
299
+
300
+ #[ tokio:: test]
301
+ async fn test_rsync_actor_and_mesh ( ) -> Result < ( ) > {
302
+ // Create source workspace with test files
303
+ let source_workspace = TempDir :: new ( ) ?;
304
+ fs:: write ( source_workspace. path ( ) . join ( "test1.txt" ) , "content1" ) . await ?;
305
+ fs:: write ( source_workspace. path ( ) . join ( "test2.txt" ) , "content2" ) . await ?;
306
+ fs:: create_dir ( source_workspace. path ( ) . join ( "subdir" ) ) . await ?;
307
+ fs:: write ( source_workspace. path ( ) . join ( "subdir/test3.txt" ) , "content3" ) . await ?;
308
+
309
+ // Create target workspace for the actors
310
+ let target_workspace = TempDir :: new ( ) ?;
311
+
312
+ // Set up actor mesh with 2 RsyncActors
313
+ let alloc = LocalAllocator
314
+ . allocate ( AllocSpec {
315
+ shape : shape ! { replica = 1 } ,
316
+ constraints : Default :: default ( ) ,
317
+ } )
318
+ . await ?;
319
+
320
+ let proc_mesh = ProcMesh :: allocate ( alloc) . await ?;
321
+
322
+ // Create RsyncParams - all actors will use the same target workspace for this test
323
+ let params = RsyncParams {
324
+ workspace : WorkspaceLocation :: Constant ( target_workspace. path ( ) . to_path_buf ( ) ) ,
325
+ } ;
326
+
327
+ // Spawn actor mesh with RsyncActors
328
+ let actor_mesh = proc_mesh. spawn :: < RsyncActor > ( "rsync_test" , & params) . await ?;
329
+
330
+ // Test rsync_mesh function - this coordinates rsync operations across the mesh
331
+ rsync_mesh ( & actor_mesh, source_workspace. path ( ) . to_path_buf ( ) ) . await ?;
332
+
333
+ // Verify we copied correctly.
334
+ assert ! (
335
+ !dir_diff:: is_different( & source_workspace, & target_workspace)
336
+ . map_err( |e| anyhow!( "{:?}" , e) ) ?
337
+ ) ;
338
+
339
+ Ok ( ( ) )
340
+ }
262
341
}
0 commit comments