@@ -15,23 +15,23 @@ use std::time::Duration;
15
15
16
16
use anyhow:: Context ;
17
17
use anyhow:: Result ;
18
- use anyhow:: anyhow;
19
18
use anyhow:: bail;
20
19
use anyhow:: ensure;
21
20
use async_trait:: async_trait;
21
+ use futures:: FutureExt ;
22
22
use futures:: StreamExt ;
23
23
use futures:: TryFutureExt ;
24
24
use futures:: TryStreamExt ;
25
- use futures:: stream;
26
25
use futures:: try_join;
27
26
use hyperactor:: Actor ;
28
27
use hyperactor:: Bind ;
29
28
use hyperactor:: Handler ;
30
29
use hyperactor:: Named ;
31
- use hyperactor:: OncePortRef ;
30
+ use hyperactor:: PortRef ;
32
31
use hyperactor:: Unbind ;
33
32
use hyperactor:: clock:: Clock ;
34
33
use hyperactor:: clock:: RealClock ;
34
+ use ndslice:: Selection ;
35
35
use nix:: sys:: signal;
36
36
use nix:: sys:: signal:: Signal ;
37
37
use nix:: unistd:: Pid ;
@@ -48,6 +48,7 @@ use crate::actor_mesh::ActorMesh;
48
48
use crate :: code_sync:: WorkspaceLocation ;
49
49
use crate :: connect:: Connect ;
50
50
use crate :: connect:: accept;
51
+ use crate :: sel;
51
52
52
53
/// Represents a single file change from rsync
53
54
#[ derive( Debug , Clone , PartialEq , Eq , Serialize , Deserialize ) ]
@@ -281,9 +282,9 @@ impl RsyncDaemon {
281
282
#[ derive( Debug , Named , Serialize , Deserialize , Bind , Unbind ) ]
282
283
pub struct RsyncMessage {
283
284
/// The connect message to create a duplex bytestream with the client.
284
- pub connect : Connect ,
285
+ pub connect : PortRef < Connect > ,
285
286
/// A port to send back the rsync result or any errors.
286
- pub result : OncePortRef < Result < RsyncResult , String > > ,
287
+ pub result : PortRef < Result < RsyncResult , String > > ,
287
288
}
288
289
289
290
#[ derive( Debug , Named , Serialize , Deserialize ) ]
@@ -315,9 +316,11 @@ impl Handler<RsyncMessage> for RsyncActor {
315
316
) -> Result < ( ) , anyhow:: Error > {
316
317
let res = async {
317
318
let workspace = self . workspace . resolve ( ) ?;
319
+ let ( connect_msg, completer) = Connect :: allocate ( cx. self_id ( ) . clone ( ) , cx) ;
320
+ connect. send ( cx, connect_msg) ?;
318
321
let ( listener, mut stream) = try_join ! (
319
322
TcpListener :: bind( ( "::1" , 0 ) ) . err_into( ) ,
320
- accept ( cx , cx . self_id ( ) . clone ( ) , connect ) ,
323
+ completer . complete ( ) ,
321
324
) ?;
322
325
let addr = listener. local_addr ( ) ?;
323
326
let ( rsync_result, _) = try_join ! ( do_rsync( & addr, & workspace) , async move {
@@ -337,43 +340,43 @@ pub async fn rsync_mesh<M>(actor_mesh: &M, workspace: PathBuf) -> Result<Vec<Rsy
337
340
where
338
341
M : ActorMesh < Actor = RsyncActor > ,
339
342
{
340
- // Spawn a rsync daemon to acceopt incoming connections from actors.
343
+ // Spawn a rsync daemon to accept incoming connections from actors.
341
344
let daemon = RsyncDaemon :: spawn ( TcpListener :: bind ( ( "::1" , 0 ) ) . await ?, & workspace) . await ?;
342
345
let daemon_addr = daemon. addr ( ) ;
343
346
344
- // We avoid casting here as we need point-to-point connections to each individual actor.
345
- let results: Vec < RsyncResult > = stream:: iter ( actor_mesh. iter_actor_refs ( ) )
346
- . map ( anyhow:: Ok )
347
- // Connect to all actors in the mesh.
348
- . try_fold ( Vec :: new ( ) , |mut results, actor| async move {
349
- let mailbox = actor_mesh. proc_mesh ( ) . client ( ) ;
350
- let ( connect, completer) = Connect :: allocate ( mailbox. actor_id ( ) . clone ( ) , mailbox) ;
351
- let ( tx, rx) = mailbox. open_once_port :: < Result < RsyncResult , String > > ( ) ;
352
- actor. send (
353
- mailbox,
347
+ let mailbox = actor_mesh. proc_mesh ( ) . client ( ) ;
348
+ let ( rsync_conns_tx, rsync_conns_rx) = mailbox. open_port :: < Connect > ( ) ;
349
+
350
+ let ( ( ) , results) = try_join ! (
351
+ rsync_conns_rx
352
+ . take( actor_mesh. shape( ) . slice( ) . len( ) )
353
+ . err_into:: <anyhow:: Error >( )
354
+ . try_for_each_concurrent( None , |connect| async move {
355
+ let ( mut local, mut stream) = try_join!(
356
+ TcpStream :: connect( daemon_addr. clone( ) ) . err_into( ) ,
357
+ accept( mailbox, mailbox. actor_id( ) . clone( ) , connect) ,
358
+ ) ?;
359
+ tokio:: io:: copy_bidirectional( & mut local, & mut stream) . await ?;
360
+ anyhow:: Ok ( ( ) )
361
+ } )
362
+ . boxed( ) ,
363
+ async move {
364
+ let ( result_tx, result_rx) = mailbox. open_port:: <Result <RsyncResult , String >>( ) ;
365
+ actor_mesh. cast(
366
+ sel!( * ) ,
354
367
RsyncMessage {
355
- connect,
356
- result : tx . bind ( ) ,
368
+ connect: rsync_conns_tx . bind ( ) ,
369
+ result: result_tx . bind( ) ,
357
370
} ,
358
371
) ?;
359
- let ( mut local, mut stream) = try_join ! (
360
- TcpStream :: connect( daemon_addr. clone( ) ) . err_into( ) ,
361
- completer. complete( ) ,
362
- ) ?;
363
- // Pipe the remote rsync client to the local rsync server, but don't propagate failures yet.
364
- let copy_res = tokio:: io:: copy_bidirectional ( & mut local, & mut stream) . await ;
365
- // Now wait for the final result to be sent back. We wrap in a timeout, as we should get this
366
- // back pretty quickly after the copy above is done.
367
- let rsync_result = RealClock
368
- . timeout ( Duration :: from_secs ( 1 ) , rx. recv ( ) )
369
- . await ??
370
- . map_err ( |err| anyhow ! ( "failure from {}: {}" , actor. actor_id( ) , err) ) ?;
371
- // Finally, propagate any copy errors, in case there were some but not result error.
372
- let _ = copy_res?;
373
- results. push ( rsync_result) ;
374
- anyhow:: Ok ( results)
375
- } )
376
- . await ?;
372
+ let res: Vec <RsyncResult > = result_rx
373
+ . take( actor_mesh. shape( ) . slice( ) . len( ) )
374
+ . map( |res| res?. map_err( anyhow:: Error :: msg) )
375
+ . try_collect( )
376
+ . await ?;
377
+ anyhow:: Ok ( res)
378
+ } ,
379
+ ) ?;
377
380
378
381
daemon. shutdown ( ) . await ?;
379
382
0 commit comments