From 40c72f2d0aed22f070531646f901382b6a4aa653 Mon Sep 17 00:00:00 2001 From: Shayne Fletcher Date: Wed, 9 Jul 2025 13:28:08 -0700 Subject: [PATCH 1/2] : actor_mesh: use commactor in cast_slices (#479) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Summary: - in D77963903, routing was updated to use the `(ActorMeshId, ActorId)` pair as the stream key for sequence number tracking. this change allows different sub-slices of a mesh to safely share a common stream identity as long as they belong to the same logical `ActorMeshId`, avoiding issues like message reordering or duplication due to slice mismatch. as a result, this diff removes the now-unnecessary logic that intersected the user-provided selection with a reified view of the actor mesh's slice. correctness depends on all casts to a given `ActorMeshId` being evaluated consistently against that mesh's slice. - previously, `cast_slices` didn’t perform a true cast; it sent messages point-to-point to each rank in the input slices. now, we replace it with a `Selection` constructed via `Selection::of_ranks` and `dsl::union`, and invoke `cast`. the unused `cast_slices` is removed. Reviewed By: mariusae Differential Revision: D77953855 --- hyperactor_mesh/src/actor_mesh.rs | 26 +++---------------- monarch_extension/src/mesh_controller.rs | 33 ++++++++++++++++++++---- monarch_tensor_worker/src/lib.rs | 12 ++++++++- 3 files changed, 42 insertions(+), 29 deletions(-) diff --git a/hyperactor_mesh/src/actor_mesh.rs b/hyperactor_mesh/src/actor_mesh.rs index 4e67c343..c36057c3 100644 --- a/hyperactor_mesh/src/actor_mesh.rs +++ b/hyperactor_mesh/src/actor_mesh.rs @@ -37,8 +37,6 @@ use ndslice::Selection; use ndslice::Shape; use ndslice::ShapeError; use ndslice::Slice; -use ndslice::dsl; -use ndslice::selection::ReifyView; use serde::Deserialize; use serde::Serialize; use tokio::sync::mpsc; @@ -62,7 +60,7 @@ pub(crate) fn actor_mesh_cast( caps: &impl cap::CanSend, actor_mesh_id: ActorMeshId, actor_mesh_shape: &Shape, - proc_mesh_shape: &Shape, + _proc_mesh_shape: &Shape, actor_name: &str, sender: &ActorId, comm_actor_ref: &ActorRef, @@ -77,6 +75,7 @@ where "message_variant" => message.arm().unwrap_or_default(), )); + let slice = actor_mesh_shape.slice().clone(); let message = CastMessageEnvelope::new( actor_mesh_id, sender.clone(), @@ -86,29 +85,10 @@ where None, // TODO: reducer typehash )?; - // Sub-set the selection to the selection that represents the mesh's view - // of the root mesh. We need to do this because the comm actor uses the - // slice as the stream key; thus different sub-slices will result in potentially - // out of order delivery. - // - // TODO: We should repair this by introducing an explicit stream key, associated - // with the root mesh. - let selection_of_slice = proc_mesh_shape - .slice() - .reify_view(actor_mesh_shape.slice()) - .expect("invalid slice"); - let selection = dsl::intersection(selection, selection_of_slice); - comm_actor_ref.send( caps, CastMessage { - dest: Uslice { - // TODO: currently this slice is being used as the stream key - // in comm actor. We should change it to an explicit id, maintained - // by the root proc mesh. - slice: proc_mesh_shape.slice().clone(), - selection, - }, + dest: Uslice { slice, selection }, message, }, )?; diff --git a/monarch_extension/src/mesh_controller.rs b/monarch_extension/src/mesh_controller.rs index e3b8affb..a440032b 100644 --- a/monarch_extension/src/mesh_controller.rs +++ b/monarch_extension/src/mesh_controller.rs @@ -7,6 +7,7 @@ */ use std::collections::BTreeMap; +use std::collections::BTreeSet; use std::collections::HashMap; use std::collections::HashSet; use std::collections::VecDeque; @@ -32,6 +33,7 @@ use hyperactor::PortRef; use hyperactor::cap::CanSend; use hyperactor::mailbox::MailboxSenderError; use hyperactor_mesh::Mesh; +use hyperactor_mesh::actor_mesh::ActorMesh; use hyperactor_mesh::actor_mesh::RootActorMesh; use hyperactor_mesh::shared_cell::SharedCell; use hyperactor_mesh::shared_cell::SharedCellRef; @@ -53,7 +55,9 @@ use monarch_messages::worker::WorkerMessage; use monarch_messages::worker::WorkerParams; use monarch_tensor_worker::AssignRankMessage; use monarch_tensor_worker::WorkerActor; +use ndslice::Selection; use ndslice::Slice; +use ndslice::selection; use pyo3::exceptions::PyValueError; use pyo3::prelude::*; use tokio::sync::Mutex; @@ -250,6 +254,7 @@ impl Invocation { } } + #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `MailboxSenderError`. fn add_user( &mut self, sender: &impl CanSend, @@ -287,6 +292,7 @@ impl Invocation { } } + #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `MailboxSenderError`. fn complete(&mut self, sender: &impl CanSend) -> Result<(), MailboxSenderError> { let old_status = std::mem::replace(&mut self.status, Status::Complete {}); match old_status { @@ -310,6 +316,7 @@ impl Invocation { /// Incomplete, it may have users that will also become errored. This function /// will return those users so the error can be propagated. It does not autmoatically /// propagate the error to avoid deep recursive invocations. + #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `MailboxSenderError`. fn set_exception( &mut self, sender: &impl CanSend, @@ -447,6 +454,7 @@ impl History { } /// Add an invocation to the history. + #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `MailboxSenderError`. pub fn add_invocation( &mut self, sender: &impl CanSend, @@ -486,6 +494,7 @@ impl History { /// Propagate worker error to the invocation with the given Seq. This will also propagate /// to all seqs that depend on this seq directly or indirectly. + #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `MailboxSenderError`. pub fn propagate_exception( &mut self, sender: &impl CanSend, @@ -539,6 +548,7 @@ impl History { /// Mark the given rank as completed up to but excluding the given Seq. This will also purge history for /// any Seqs that are no longer relevant (completed on all ranks). + #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `MailboxSenderError`. pub fn rank_completed( &mut self, sender: &impl CanSend, @@ -623,6 +633,7 @@ impl MeshControllerActor { fn workers(&self) -> SharedCellRef> { self.workers.as_ref().unwrap().borrow().unwrap() } + fn handle_debug( &mut self, this: &Context, @@ -741,7 +752,8 @@ impl Actor for MeshControllerActor { workers .borrow() .unwrap() - .cast_slices(vec![slice.clone()], AssignRankMessage::AssignRank())?; + .cast(selection::dsl::true_(), AssignRankMessage::AssignRank())?; + self.workers = Some(workers); Ok(()) } @@ -796,7 +808,19 @@ impl Handler for MeshControllerActor { ) -> anyhow::Result<()> { match message { ClientToControllerMessage::Send { slices, message } => { - self.workers().cast_slices(slices, message)?; + let selection = slices + .iter() + .map(|slice| { + Selection::of_ranks( + self.workers().shape().slice(), + &slice.iter().collect::>(), + ) + }) + .collect::, _>>()? + .into_iter() + .reduce(selection::dsl::union) + .unwrap_or_else(selection::dsl::false_); + self.workers().cast(selection, message.clone())?; } ClientToControllerMessage::Node { seq, @@ -812,9 +836,8 @@ impl Handler for MeshControllerActor { self.history.drop_refs(refs); } ClientToControllerMessage::SyncAtExit { port } => { - let all_ranks = vec![self.workers().shape().slice().clone()]; - self.workers().cast_slices( - all_ranks, + self.workers().cast( + selection::dsl::true_(), WorkerMessage::RequestStatus { seq: self.history.seq_lower_bound, controller: false, diff --git a/monarch_tensor_worker/src/lib.rs b/monarch_tensor_worker/src/lib.rs index f1438067..5ce115f6 100644 --- a/monarch_tensor_worker/src/lib.rs +++ b/monarch_tensor_worker/src/lib.rs @@ -290,7 +290,17 @@ pub enum AssignRankMessage { } #[async_trait] -#[forward(WorkerMessage)] +impl Handler for WorkerActor { + async fn handle( + &mut self, + cx: &hyperactor::Context, + message: WorkerMessage, + ) -> anyhow::Result<()> { + ::handle(self, cx, message).await + } +} + +#[async_trait] impl WorkerMessageHandler for WorkerActor { async fn backend_network_init( &mut self, From c93c56a5c5fd526bc86e4def231a8297e0e74523 Mon Sep 17 00:00:00 2001 From: Shayne Fletcher Date: Wed, 9 Jul 2025 13:28:08 -0700 Subject: [PATCH 2/2] : actor_mesh: remove redundant params Summary: - remove the `DestinationPort` arg from `CastMessageEnvelope::new` since it can be calculated from the other arguments - remove now unnecessary arguments from the utility `actor_mesh::actor_mesh_cast` (proc mesh shape and actor name) - generally make the handling of generics consistent in these interfaces consistent Differential Revision: D78030552 --- hyperactor_mesh/src/actor_mesh.rs | 43 +++++++++++++-------------- hyperactor_mesh/src/comm/multicast.rs | 14 +++++---- hyperactor_mesh/src/reference.rs | 10 +++---- 3 files changed, 34 insertions(+), 33 deletions(-) diff --git a/hyperactor_mesh/src/actor_mesh.rs b/hyperactor_mesh/src/actor_mesh.rs index c36057c3..69bb7ca1 100644 --- a/hyperactor_mesh/src/actor_mesh.rs +++ b/hyperactor_mesh/src/actor_mesh.rs @@ -45,7 +45,6 @@ use crate::CommActor; use crate::Mesh; use crate::comm::multicast::CastMessage; use crate::comm::multicast::CastMessageEnvelope; -use crate::comm::multicast::DestinationPort; use crate::comm::multicast::Uslice; use crate::comm::multicast::set_cast_info_on_headers; use crate::metrics; @@ -54,21 +53,21 @@ use crate::reference::ActorMeshId; use crate::reference::ActorMeshRef; use crate::reference::ProcMeshId; -/// Common implementation for ActorMeshes and ActorMeshRefs to cast an [`M`]-typed message +/// Common implementation for `ActorMesh`s and `ActorMeshRef`s to cast +/// an `M`-typed message #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `CastError`. -pub(crate) fn actor_mesh_cast( +pub(crate) fn actor_mesh_cast( caps: &impl cap::CanSend, actor_mesh_id: ActorMeshId, actor_mesh_shape: &Shape, - _proc_mesh_shape: &Shape, - actor_name: &str, sender: &ActorId, comm_actor_ref: &ActorRef, selection: Selection, message: M, ) -> Result<(), CastError> where - A: RemoteHandles + RemoteHandles>, + A: RemoteActor + RemoteHandles>, + M: Castable + RemoteMessage, { let _ = metrics::ACTOR_MESH_CAST_DURATION.start(hyperactor::kv_pairs!( "message_type" => M::typename(), @@ -76,10 +75,9 @@ where )); let slice = actor_mesh_shape.slice().clone(); - let message = CastMessageEnvelope::new( + let message = CastMessageEnvelope::new::( actor_mesh_id, sender.clone(), - DestinationPort::new::(actor_name.to_string()), actor_mesh_shape.clone(), message, None, // TODO: reducer typehash @@ -101,30 +99,29 @@ pub trait ActorMesh: Mesh { /// The type of actor in the mesh. type Actor: RemoteActor; - /// Cast an [`M`]-typed message to the ranks selected by `sel` - /// in this ActorMesh. + /// Cast an `M`-typed message to the ranks selected by `sel` in + /// this ActorMesh. #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `CastError`. - fn cast(&self, selection: Selection, message: M) -> Result<(), CastError> + fn cast(&self, selection: Selection, message: M) -> Result<(), CastError> where - Self::Actor: RemoteHandles + RemoteHandles>, + Self::Actor: RemoteHandles>, + M: Castable + RemoteMessage, { - actor_mesh_cast::( - self.proc_mesh().client(), - self.id(), - self.shape(), - self.proc_mesh().shape(), - self.name(), - self.proc_mesh().client().actor_id(), - self.proc_mesh().comm_actor(), - selection, - message, + actor_mesh_cast::( + self.proc_mesh().client(), // send capability + self.id(), // actor mesh id (destination mesh) + self.shape(), // actor mesh shape + self.proc_mesh().client().actor_id(), // sender + self.proc_mesh().comm_actor(), // comm actor + selection, // the selected actors + message, // the message ) } /// The ProcMesh on top of which this actor mesh is spawned. fn proc_mesh(&self) -> &ProcMesh; - /// The name global name of actors in this mesh. + /// The name given to the actors in this mesh. fn name(&self) -> &str; fn world_id(&self) -> &WorldId { diff --git a/hyperactor_mesh/src/comm/multicast.rs b/hyperactor_mesh/src/comm/multicast.rs index 56e0267e..371f8e39 100644 --- a/hyperactor_mesh/src/comm/multicast.rs +++ b/hyperactor_mesh/src/comm/multicast.rs @@ -62,19 +62,23 @@ pub struct CastMessageEnvelope { impl CastMessageEnvelope { /// Create a new CastMessageEnvelope. - pub fn new( + pub fn new( actor_mesh_id: ActorMeshId, sender: ActorId, - dest_port: DestinationPort, shape: Shape, - message: T, + message: M, reducer_typehash: Option, - ) -> Result { + ) -> Result + where + A: RemoteActor + RemoteHandles>, + M: Castable + RemoteMessage, + { let data = ErasedUnbound::try_from_message(message)?; + let actor_name = actor_mesh_id.1.to_string(); Ok(Self { actor_mesh_id, sender, - dest_port, + dest_port: DestinationPort::new::(actor_name), data, reducer_typehash, shape, diff --git a/hyperactor_mesh/src/reference.rs b/hyperactor_mesh/src/reference.rs index 2777e2d2..caf20b83 100644 --- a/hyperactor_mesh/src/reference.rs +++ b/hyperactor_mesh/src/reference.rs @@ -14,6 +14,7 @@ use std::marker::PhantomData; use hyperactor::ActorRef; use hyperactor::Named; use hyperactor::RemoteHandles; +use hyperactor::RemoteMessage; use hyperactor::actor::RemoteActor; use hyperactor::cap; use hyperactor::message::Castable; @@ -54,7 +55,7 @@ macro_rules! mesh_id { )] pub struct ProcMeshId(pub String); -/// Actor Mesh ID. Tuple of the ProcMesh ID and Actor Mesh ID. +/// Actor Mesh ID. Tuple of the ProcMesh ID and actor name. #[derive( Debug, Serialize, @@ -123,7 +124,7 @@ impl ActorMeshRef { /// Cast an [`M`]-typed message to the ranks selected by `sel` /// in this ActorMesh. #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `CastError`. - pub fn cast( + pub fn cast( &self, caps: &(impl cap::CanSend + cap::CanOpenPort), selection: Selection, @@ -131,13 +132,12 @@ impl ActorMeshRef { ) -> Result<(), CastError> where A: RemoteHandles + RemoteHandles>, + M: Castable + RemoteMessage, { - actor_mesh_cast::( + actor_mesh_cast::( caps, self.mesh_id.clone(), self.shape(), - self.proc_mesh_shape(), - self.name(), caps.mailbox().actor_id(), &self.comm_actor_ref, selection,