@@ -33,12 +33,9 @@ use serde::Serialize;
33
33
use serde:: Serializer ;
34
34
use serde_with:: serde_as;
35
35
use tokio:: sync:: Mutex ;
36
- use tokio:: sync:: SetError ;
37
36
use tokio:: sync:: mpsc;
38
- use tokio:: sync:: mpsc:: Sender ;
39
37
use tokio:: sync:: mpsc:: UnboundedReceiver ;
40
38
use tokio:: sync:: mpsc:: UnboundedSender ;
41
- use tokio:: sync:: mpsc:: error:: SendError ;
42
39
use tokio:: task:: JoinError ;
43
40
use tokio:: task:: JoinHandle ;
44
41
use tokio:: time:: interval;
@@ -49,7 +46,6 @@ use crate::ActorId;
49
46
use crate :: Mailbox ;
50
47
use crate :: Named ;
51
48
use crate :: OncePortRef ;
52
- use crate :: WorldId ;
53
49
use crate :: channel;
54
50
use crate :: channel:: ChannelAddr ;
55
51
use crate :: channel:: Rx ;
@@ -313,18 +309,6 @@ pub enum SimNetError {
313
309
#[ error( "proxy not available: {0}" ) ]
314
310
ProxyNotAvailable ( String ) ,
315
311
316
- /// Unable to send message to the simulator.
317
- #[ error( transparent) ]
318
- OperationalMessageSendError ( #[ from] SendError < OperationalMessage > ) ,
319
-
320
- /// Setting the operational message sender which is already set.
321
- #[ error( transparent) ]
322
- OperationalMessageSenderSetError ( #[ from] SetError < Sender < OperationalMessage > > ) ,
323
-
324
- /// Missing OperationalMessageReceiver.
325
- #[ error( "missing operational message receiver" ) ]
326
- MissingOperationalMessageReceiver ,
327
-
328
312
/// Cannot deliver the message because destination address is missing.
329
313
#[ error( "missing destination address" ) ]
330
314
MissingDestinationAddress ,
@@ -361,8 +345,6 @@ pub struct SimNetHandle {
361
345
/// Handle to a running proxy server that forwards external messages
362
346
/// into the simnet.
363
347
proxy_handle : ProxyHandle ,
364
- /// A sender to forward simulator operational messages.
365
- operational_message_tx : UnboundedSender < OperationalMessage > ,
366
348
/// A receiver to receive simulator operational messages.
367
349
/// The receiver can be moved out of the simnet handle.
368
350
training_script_state_tx : tokio:: sync:: watch:: Sender < TrainingScriptState > ,
@@ -489,82 +471,6 @@ impl SimNetHandle {
489
471
490
472
pub ( crate ) type Topology = DashMap < SimNetEdge , SimNetEdgeInfo > ;
491
473
492
- /// The message to spawn a simulated mesh.
493
- #[ derive( Debug , Serialize , Deserialize , PartialEq , Clone ) ]
494
- pub struct SpawnMesh {
495
- /// The system address.
496
- pub system_addr : ChannelAddr ,
497
- /// The controller actor ID.
498
- pub controller_actor_id : ActorId ,
499
- /// The worker world.
500
- pub worker_world : WorldId ,
501
- }
502
-
503
- impl SpawnMesh {
504
- /// Creates a new SpawnMesh.
505
- pub fn new (
506
- system_addr : ChannelAddr ,
507
- controller_actor_id : ActorId ,
508
- worker_world : WorldId ,
509
- ) -> Self {
510
- Self {
511
- system_addr,
512
- controller_actor_id,
513
- worker_world,
514
- }
515
- }
516
- }
517
-
518
- /// An OperationalMessage is a message to control the simulator to do tasks such as
519
- /// spawning or killing actors.
520
- #[ derive( Debug , Serialize , Deserialize , PartialEq , Clone , Named ) ]
521
- pub enum OperationalMessage {
522
- /// Kill the world with given world_id.
523
- KillWorld ( String ) ,
524
- /// Spawn actors in a mesh.
525
- SpawnMesh ( SpawnMesh ) ,
526
- /// Update training script state.
527
- SetTrainingScriptState ( TrainingScriptState ) ,
528
- }
529
-
530
- /// Message Event that can be sent to the simulator.
531
- #[ derive( Debug ) ]
532
- pub struct SimOperation {
533
- /// Sender to send OperationalMessage to the simulator.
534
- operational_message_tx : UnboundedSender < OperationalMessage > ,
535
- operational_message : OperationalMessage ,
536
- }
537
-
538
- impl SimOperation {
539
- /// Creates a new SimOperation.
540
- pub fn new (
541
- operational_message_tx : UnboundedSender < OperationalMessage > ,
542
- operational_message : OperationalMessage ,
543
- ) -> Self {
544
- Self {
545
- operational_message_tx,
546
- operational_message,
547
- }
548
- }
549
- }
550
-
551
- #[ async_trait]
552
- impl Event for SimOperation {
553
- async fn handle ( & self ) -> Result < ( ) , SimNetError > {
554
- self . operational_message_tx
555
- . send ( self . operational_message . clone ( ) ) ?;
556
- Ok ( ( ) )
557
- }
558
-
559
- fn duration_ms ( & self ) -> u64 {
560
- 0
561
- }
562
-
563
- fn summary ( & self ) -> String {
564
- format ! ( "SimOperation: {:?}" , self . operational_message)
565
- }
566
- }
567
-
568
474
/// A ProxyMessage is a message that SimNet proxy receives.
569
475
/// The message may requests the SimNet to send the payload in the message field from
570
476
/// src to dst if addr field exists.
@@ -658,7 +564,6 @@ impl ProxyHandle {
658
564
proxy_addr : ChannelAddr ,
659
565
event_tx : UnboundedSender < ( Box < dyn Event > , bool , Option < SimulatorTimeInstant > ) > ,
660
566
pending_event_count : Arc < AtomicUsize > ,
661
- operational_message_tx : UnboundedSender < OperationalMessage > ,
662
567
) -> anyhow:: Result < Self > {
663
568
let ( addr, mut rx) = channel:: serve :: < MessageEnvelope > ( proxy_addr) . await ?;
664
569
tracing:: info!( "SimNet serving external traffic on {}" , & addr) ;
@@ -672,26 +577,18 @@ impl ProxyHandle {
672
577
#[ allow( clippy:: disallowed_methods) ]
673
578
if let Ok ( Ok ( msg) ) = timeout ( Duration :: from_millis ( 100 ) , rx. recv ( ) ) . await {
674
579
let proxy_message: ProxyMessage = msg. deserialized ( ) . unwrap ( ) ;
675
- let event : Box < dyn Event > = match proxy_message. dest_addr {
676
- Some ( dest_addr ) => Box :: new ( MessageDeliveryEvent :: new (
580
+ if let Some ( dest_addr ) = proxy_message. dest_addr {
581
+ let event = Box :: new ( MessageDeliveryEvent :: new (
677
582
proxy_message. sender_addr ,
678
583
dest_addr,
679
584
proxy_message. data ,
680
- ) ) ,
681
- None => {
682
- let operational_message: OperationalMessage =
683
- proxy_message. data . deserialized ( ) . unwrap ( ) ;
684
- Box :: new ( SimOperation :: new (
685
- operational_message_tx. clone ( ) ,
686
- operational_message,
687
- ) )
585
+ ) ) ;
586
+ if let Err ( e) = event_tx. send ( ( event, true , None ) ) {
587
+ tracing:: error!( "error sending message to simnet: {:?}" , e) ;
588
+ } else {
589
+ pending_event_count
590
+ . fetch_add ( 1 , std:: sync:: atomic:: Ordering :: SeqCst ) ;
688
591
}
689
- } ;
690
-
691
- if let Err ( e) = event_tx. send ( ( event, true , None ) ) {
692
- tracing:: error!( "error sending message to simnet: {:?}" , e) ;
693
- } else {
694
- pending_event_count. fetch_add ( 1 , std:: sync:: atomic:: Ordering :: SeqCst ) ;
695
592
}
696
593
}
697
594
if stop_signal. load ( Ordering :: SeqCst ) {
@@ -729,7 +626,7 @@ pub fn start(
729
626
private_addr : ChannelAddr ,
730
627
proxy_addr : ChannelAddr ,
731
628
max_duration_ms : u64 ,
732
- ) -> anyhow:: Result < UnboundedReceiver < OperationalMessage > > {
629
+ ) -> anyhow:: Result < ( ) > {
733
630
// Construct a topology with one node: the default A.
734
631
let address_book: DashSet < ChannelAddr > = DashSet :: new ( ) ;
735
632
address_book. insert ( private_addr. clone ( ) ) ;
@@ -775,14 +672,11 @@ pub fn start(
775
672
. await
776
673
} )
777
674
} ) ) ;
778
- let ( operational_message_tx, operational_message_rx) =
779
- mpsc:: unbounded_channel :: < OperationalMessage > ( ) ;
780
675
781
676
let proxy_handle = block_on ( ProxyHandle :: start (
782
677
proxy_addr,
783
678
event_tx. clone ( ) ,
784
679
pending_event_count. clone ( ) ,
785
- operational_message_tx. clone ( ) ,
786
680
) )
787
681
. map_err ( |err| SimNetError :: ProxyNotAvailable ( err. to_string ( ) ) ) ?;
788
682
@@ -792,12 +686,11 @@ pub fn start(
792
686
config,
793
687
pending_event_count,
794
688
proxy_handle,
795
- operational_message_tx,
796
689
training_script_state_tx,
797
690
stop_signal,
798
691
} ) ;
799
692
800
- Ok ( operational_message_rx )
693
+ Ok ( ( ) )
801
694
}
802
695
803
696
impl SimNet {
@@ -1068,6 +961,7 @@ mod tests {
1068
961
use tokio:: sync:: Mutex ;
1069
962
1070
963
use super :: * ;
964
+ use crate :: channel;
1071
965
use crate :: channel:: ChannelTransport ;
1072
966
use crate :: channel:: sim:: SimAddr ;
1073
967
use crate :: clock:: Clock ;
@@ -1488,45 +1382,6 @@ edges:
1488
1382
assert_eq ! ( records. unwrap( ) . first( ) . unwrap( ) , & expected_record) ;
1489
1383
}
1490
1384
1491
- #[ cfg( target_os = "linux" ) ]
1492
- #[ tokio:: test]
1493
- async fn test_simnet_receive_operational_message ( ) {
1494
- use tokio:: sync:: oneshot;
1495
-
1496
- use crate :: PortId ;
1497
- use crate :: channel:: Tx ;
1498
-
1499
- let proxy_addr = ChannelAddr :: any ( channel:: ChannelTransport :: Unix ) ;
1500
- let mut operational_message_rx = start (
1501
- ChannelAddr :: any ( ChannelTransport :: Unix ) ,
1502
- proxy_addr. clone ( ) ,
1503
- 1000 ,
1504
- )
1505
- . unwrap ( ) ;
1506
- let tx = crate :: channel:: dial ( proxy_addr. clone ( ) ) . unwrap ( ) ;
1507
- let port_id = PortId ( id ! ( test[ 0 ] . actor0) , 0 ) ;
1508
- let spawn_mesh = SpawnMesh {
1509
- system_addr : "unix!@system" . parse ( ) . unwrap ( ) ,
1510
- controller_actor_id : id ! ( controller_world[ 0 ] . actor) ,
1511
- worker_world : id ! ( worker_world) ,
1512
- } ;
1513
- let operational_message = OperationalMessage :: SpawnMesh ( spawn_mesh. clone ( ) ) ;
1514
- let serialized_operational_message = Serialized :: serialize ( & operational_message) . unwrap ( ) ;
1515
- let proxy_message = ProxyMessage :: new ( None , None , serialized_operational_message) ;
1516
- let serialized_proxy_message = Serialized :: serialize ( & proxy_message) . unwrap ( ) ;
1517
- let external_message = MessageEnvelope :: new_unknown ( port_id, serialized_proxy_message) ;
1518
-
1519
- // Send the message to the simnet.
1520
- tx. try_post ( external_message, oneshot:: channel ( ) . 0 ) . unwrap ( ) ;
1521
- // flush doesn't work here because tx.send() delivers the message through real network.
1522
- // We have to wait for the message to enter simnet.
1523
- RealClock . sleep ( Duration :: from_millis ( 1000 ) ) . await ;
1524
- let received_operational_message = operational_message_rx. recv ( ) . await . unwrap ( ) ;
1525
-
1526
- // Check the received message.
1527
- assert_eq ! ( received_operational_message, operational_message) ;
1528
- }
1529
-
1530
1385
#[ tokio:: test]
1531
1386
async fn test_sim_sleep ( ) {
1532
1387
start (
0 commit comments