Skip to content

Commit cf2fc1f

Browse files
thomasywangfacebook-github-bot
authored andcommitted
No more operational messages (#473)
Summary: Operational messages were used in order to signal to the simulator that it should perform certain actions like growing or shrinking the mesh. This was needed since the python and rust were running in separate processes, and messages were needed to communicate between the two, but now everything is on the same process so we can do this in memory. Differential Revision: D77941643
1 parent 7fb56d0 commit cf2fc1f

File tree

8 files changed

+99
-384
lines changed

8 files changed

+99
-384
lines changed

hyperactor/src/simnet.rs

Lines changed: 10 additions & 156 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,9 @@ use serde::Serialize;
3333
use serde::Serializer;
3434
use serde_with::serde_as;
3535
use tokio::sync::Mutex;
36-
use tokio::sync::SetError;
3736
use tokio::sync::mpsc;
38-
use tokio::sync::mpsc::Sender;
3937
use tokio::sync::mpsc::UnboundedReceiver;
4038
use tokio::sync::mpsc::UnboundedSender;
41-
use tokio::sync::mpsc::error::SendError;
4239
use tokio::task::JoinError;
4340
use tokio::task::JoinHandle;
4441
use tokio::time::interval;
@@ -49,7 +46,6 @@ use crate::ActorId;
4946
use crate::Mailbox;
5047
use crate::Named;
5148
use crate::OncePortRef;
52-
use crate::WorldId;
5349
use crate::channel;
5450
use crate::channel::ChannelAddr;
5551
use crate::channel::Rx;
@@ -313,18 +309,6 @@ pub enum SimNetError {
313309
#[error("proxy not available: {0}")]
314310
ProxyNotAvailable(String),
315311

316-
/// Unable to send message to the simulator.
317-
#[error(transparent)]
318-
OperationalMessageSendError(#[from] SendError<OperationalMessage>),
319-
320-
/// Setting the operational message sender which is already set.
321-
#[error(transparent)]
322-
OperationalMessageSenderSetError(#[from] SetError<Sender<OperationalMessage>>),
323-
324-
/// Missing OperationalMessageReceiver.
325-
#[error("missing operational message receiver")]
326-
MissingOperationalMessageReceiver,
327-
328312
/// Cannot deliver the message because destination address is missing.
329313
#[error("missing destination address")]
330314
MissingDestinationAddress,
@@ -361,8 +345,6 @@ pub struct SimNetHandle {
361345
/// Handle to a running proxy server that forwards external messages
362346
/// into the simnet.
363347
proxy_handle: ProxyHandle,
364-
/// A sender to forward simulator operational messages.
365-
operational_message_tx: UnboundedSender<OperationalMessage>,
366348
/// A receiver to receive simulator operational messages.
367349
/// The receiver can be moved out of the simnet handle.
368350
training_script_state_tx: tokio::sync::watch::Sender<TrainingScriptState>,
@@ -489,82 +471,6 @@ impl SimNetHandle {
489471

490472
pub(crate) type Topology = DashMap<SimNetEdge, SimNetEdgeInfo>;
491473

492-
/// The message to spawn a simulated mesh.
493-
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
494-
pub struct SpawnMesh {
495-
/// The system address.
496-
pub system_addr: ChannelAddr,
497-
/// The controller actor ID.
498-
pub controller_actor_id: ActorId,
499-
/// The worker world.
500-
pub worker_world: WorldId,
501-
}
502-
503-
impl SpawnMesh {
504-
/// Creates a new SpawnMesh.
505-
pub fn new(
506-
system_addr: ChannelAddr,
507-
controller_actor_id: ActorId,
508-
worker_world: WorldId,
509-
) -> Self {
510-
Self {
511-
system_addr,
512-
controller_actor_id,
513-
worker_world,
514-
}
515-
}
516-
}
517-
518-
/// An OperationalMessage is a message to control the simulator to do tasks such as
519-
/// spawning or killing actors.
520-
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Named)]
521-
pub enum OperationalMessage {
522-
/// Kill the world with given world_id.
523-
KillWorld(String),
524-
/// Spawn actors in a mesh.
525-
SpawnMesh(SpawnMesh),
526-
/// Update training script state.
527-
SetTrainingScriptState(TrainingScriptState),
528-
}
529-
530-
/// Message Event that can be sent to the simulator.
531-
#[derive(Debug)]
532-
pub struct SimOperation {
533-
/// Sender to send OperationalMessage to the simulator.
534-
operational_message_tx: UnboundedSender<OperationalMessage>,
535-
operational_message: OperationalMessage,
536-
}
537-
538-
impl SimOperation {
539-
/// Creates a new SimOperation.
540-
pub fn new(
541-
operational_message_tx: UnboundedSender<OperationalMessage>,
542-
operational_message: OperationalMessage,
543-
) -> Self {
544-
Self {
545-
operational_message_tx,
546-
operational_message,
547-
}
548-
}
549-
}
550-
551-
#[async_trait]
552-
impl Event for SimOperation {
553-
async fn handle(&self) -> Result<(), SimNetError> {
554-
self.operational_message_tx
555-
.send(self.operational_message.clone())?;
556-
Ok(())
557-
}
558-
559-
fn duration_ms(&self) -> u64 {
560-
0
561-
}
562-
563-
fn summary(&self) -> String {
564-
format!("SimOperation: {:?}", self.operational_message)
565-
}
566-
}
567-
568474
/// A ProxyMessage is a message that SimNet proxy receives.
569475
/// The message may requests the SimNet to send the payload in the message field from
570476
/// src to dst if addr field exists.
@@ -658,7 +564,6 @@ impl ProxyHandle {
658564
proxy_addr: ChannelAddr,
659565
event_tx: UnboundedSender<(Box<dyn Event>, bool, Option<SimulatorTimeInstant>)>,
660566
pending_event_count: Arc<AtomicUsize>,
661-
operational_message_tx: UnboundedSender<OperationalMessage>,
662567
) -> anyhow::Result<Self> {
663568
let (addr, mut rx) = channel::serve::<MessageEnvelope>(proxy_addr).await?;
664569
tracing::info!("SimNet serving external traffic on {}", &addr);
@@ -672,26 +577,18 @@ impl ProxyHandle {
672577
#[allow(clippy::disallowed_methods)]
673578
if let Ok(Ok(msg)) = timeout(Duration::from_millis(100), rx.recv()).await {
674579
let proxy_message: ProxyMessage = msg.deserialized().unwrap();
675-
let event: Box<dyn Event> = match proxy_message.dest_addr {
676-
Some(dest_addr) => Box::new(MessageDeliveryEvent::new(
580+
if let Some(dest_addr) = proxy_message.dest_addr {
581+
let event = Box::new(MessageDeliveryEvent::new(
677582
proxy_message.sender_addr,
678583
dest_addr,
679584
proxy_message.data,
680-
)),
681-
None => {
682-
let operational_message: OperationalMessage =
683-
proxy_message.data.deserialized().unwrap();
684-
Box::new(SimOperation::new(
685-
operational_message_tx.clone(),
686-
operational_message,
687-
))
585+
));
586+
if let Err(e) = event_tx.send((event, true, None)) {
587+
tracing::error!("error sending message to simnet: {:?}", e);
588+
} else {
589+
pending_event_count
590+
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
688591
}
689-
};
690-
691-
if let Err(e) = event_tx.send((event, true, None)) {
692-
tracing::error!("error sending message to simnet: {:?}", e);
693-
} else {
694-
pending_event_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
695592
}
696593
}
697594
if stop_signal.load(Ordering::SeqCst) {
@@ -729,7 +626,7 @@ pub fn start(
729626
private_addr: ChannelAddr,
730627
proxy_addr: ChannelAddr,
731628
max_duration_ms: u64,
732-
) -> anyhow::Result<UnboundedReceiver<OperationalMessage>> {
629+
) -> anyhow::Result<()> {
733630
// Construct a topology with one node: the default A.
734631
let address_book: DashSet<ChannelAddr> = DashSet::new();
735632
address_book.insert(private_addr.clone());
@@ -775,14 +672,11 @@ pub fn start(
775672
.await
776673
})
777674
}));
778-
let (operational_message_tx, operational_message_rx) =
779-
mpsc::unbounded_channel::<OperationalMessage>();
780675

781676
let proxy_handle = block_on(ProxyHandle::start(
782677
proxy_addr,
783678
event_tx.clone(),
784679
pending_event_count.clone(),
785-
operational_message_tx.clone(),
786680
))
787681
.map_err(|err| SimNetError::ProxyNotAvailable(err.to_string()))?;
788682

@@ -792,12 +686,11 @@ pub fn start(
792686
config,
793687
pending_event_count,
794688
proxy_handle,
795-
operational_message_tx,
796689
training_script_state_tx,
797690
stop_signal,
798691
});
799692

800-
Ok(operational_message_rx)
693+
Ok(())
801694
}
802695

803696
impl SimNet {
@@ -1488,45 +1381,6 @@ edges:
14881381
assert_eq!(records.unwrap().first().unwrap(), &expected_record);
14891382
}
14901383

1491-
#[cfg(target_os = "linux")]
1492-
#[tokio::test]
1493-
async fn test_simnet_receive_operational_message() {
1494-
use tokio::sync::oneshot;
1495-
1496-
use crate::PortId;
1497-
use crate::channel::Tx;
1498-
1499-
let proxy_addr = ChannelAddr::any(channel::ChannelTransport::Unix);
1500-
let mut operational_message_rx = start(
1501-
ChannelAddr::any(ChannelTransport::Unix),
1502-
proxy_addr.clone(),
1503-
1000,
1504-
)
1505-
.unwrap();
1506-
let tx = crate::channel::dial(proxy_addr.clone()).unwrap();
1507-
let port_id = PortId(id!(test[0].actor0), 0);
1508-
let spawn_mesh = SpawnMesh {
1509-
system_addr: "unix!@system".parse().unwrap(),
1510-
controller_actor_id: id!(controller_world[0].actor),
1511-
worker_world: id!(worker_world),
1512-
};
1513-
let operational_message = OperationalMessage::SpawnMesh(spawn_mesh.clone());
1514-
let serialized_operational_message = Serialized::serialize(&operational_message).unwrap();
1515-
let proxy_message = ProxyMessage::new(None, None, serialized_operational_message);
1516-
let serialized_proxy_message = Serialized::serialize(&proxy_message).unwrap();
1517-
let external_message = MessageEnvelope::new_unknown(port_id, serialized_proxy_message);
1518-
1519-
// Send the message to the simnet.
1520-
tx.try_post(external_message, oneshot::channel().0).unwrap();
1521-
// flush doesn't work here because tx.send() delivers the message through real network.
1522-
// We have to wait for the message to enter simnet.
1523-
RealClock.sleep(Duration::from_millis(1000)).await;
1524-
let received_operational_message = operational_message_rx.recv().await.unwrap();
1525-
1526-
// Check the received message.
1527-
assert_eq!(received_operational_message, operational_message);
1528-
}
1529-
15301384
#[tokio::test]
15311385
async fn test_sim_sleep() {
15321386
start(

0 commit comments

Comments
 (0)