From 5d9aa64c609fd1d8725e0152a3c46cdf7ad80b35 Mon Sep 17 00:00:00 2001 From: zdevito Date: Mon, 14 Jul 2025 13:34:32 -0700 Subject: [PATCH] [16/n] tensor engine: enum-ify PythonMessage PythonMessage was getting a bit stringly typed. I've sectioned into 4 enum case (CallMethod, Result, Exception, Uninit) which capture when you have to pay attention to things like rank/response_port. I have to further add functionality in a follow-up to get actor ordering correct. Differential Revision: [D78295683](https://our.internmc.facebook.com/intern/diff/D78295683/) [ghstack-poisoned] --- monarch_extension/src/mesh_controller.rs | 10 +- monarch_hyperactor/src/actor.rs | 99 +++++++++------- monarch_hyperactor/src/mailbox.rs | 6 +- monarch_tensor_worker/src/stream.rs | 83 ++++++++------ .../monarch_hyperactor/actor.pyi | 64 +++++++---- python/monarch/_src/actor/actor_mesh.py | 107 +++++++++++------- python/monarch/mesh_controller.py | 2 + python/tests/_monarch/test_actor.py | 7 +- python/tests/_monarch/test_mailbox.py | 44 +++++-- 9 files changed, 264 insertions(+), 158 deletions(-) diff --git a/monarch_extension/src/mesh_controller.rs b/monarch_extension/src/mesh_controller.rs index 336148af..d53aff97 100644 --- a/monarch_extension/src/mesh_controller.rs +++ b/monarch_extension/src/mesh_controller.rs @@ -37,6 +37,7 @@ use hyperactor_mesh::actor_mesh::RootActorMesh; use hyperactor_mesh::shared_cell::SharedCell; use hyperactor_mesh::shared_cell::SharedCellRef; use monarch_hyperactor::actor::PythonMessage; +use monarch_hyperactor::actor::PythonMessageKind; use monarch_hyperactor::mailbox::PyPortId; use monarch_hyperactor::ndslice::PySlice; use monarch_hyperactor::proc_mesh::PyProcMesh; @@ -334,7 +335,7 @@ impl Invocation { Some(PortInfo { port, ranks }) => { *unreported_exception = None; for rank in ranks.iter() { - let msg = exception.as_ref().clone().with_rank(rank); + let msg = exception.as_ref().clone().into_rank(rank); port.send(sender, msg)?; } } @@ -527,7 +528,7 @@ impl History { .call1((exception.backtrace, traceback, rank)) .unwrap(); let data: Vec = pickle.call1((exe,)).unwrap().extract().unwrap(); - PythonMessage::new_from_buf("exception".to_string(), data, None, Some(rank)) + PythonMessage::new_from_buf(PythonMessageKind::Exception { rank: Some(rank) }, data) })); let mut invocation = invocation.lock().unwrap(); @@ -570,7 +571,10 @@ impl History { Some(exception) => exception.as_ref().clone(), None => { // the byte string is just a Python None - PythonMessage::new("result".to_string(), b"\x80\x04N.", None, None) + PythonMessage::new_from_buf( + PythonMessageKind::Result { rank: None }, + b"\x80\x04N.".to_vec(), + ) } }; port.send(sender, result)?; diff --git a/monarch_hyperactor/src/actor.rs b/monarch_hyperactor/src/actor.rs index 25cfd485..c203e7a6 100644 --- a/monarch_hyperactor/src/actor.rs +++ b/monarch_hyperactor/src/actor.rs @@ -171,33 +171,52 @@ impl PickledMessageClientActor { } } +#[pyclass(module = "monarch._rust_bindings.monarch_hyperactor.actor")] +#[derive(Clone, Debug, Serialize, Deserialize, Named, PartialEq)] +pub enum PythonMessageKind { + CallMethod { + name: String, + response_port: Option, + }, + Result { + rank: Option, + }, + Exception { + rank: Option, + }, + Uninit {}, +} + +impl Default for PythonMessageKind { + fn default() -> Self { + PythonMessageKind::Uninit {} + } +} + #[pyclass(frozen, module = "monarch._rust_bindings.monarch_hyperactor.actor")] -#[derive(Default, Clone, Serialize, Deserialize, Named, PartialEq)] +#[derive(Clone, Serialize, Deserialize, Named, PartialEq, Default)] pub struct PythonMessage { - pub method: String, - pub message: ByteBuf, - pub response_port: Option, - pub rank: Option, + pub kind: PythonMessageKind, + pub message: Vec, } impl PythonMessage { - pub fn with_rank(self, rank: usize) -> PythonMessage { - PythonMessage { - rank: Some(rank), - ..self - } + pub fn new_from_buf(kind: PythonMessageKind, message: Vec) -> Self { + Self { kind, message } } - pub fn new_from_buf( - method: String, - message: Vec, - response_port: Option, - rank: Option, - ) -> Self { - Self { - method, - message: message.into(), - response_port, - rank, + + pub fn into_rank(self, rank: usize) -> Self { + let rank = Some(rank); + match self.kind { + PythonMessageKind::Result { .. } => PythonMessage { + kind: PythonMessageKind::Result { rank }, + message: self.message, + }, + PythonMessageKind::Exception { .. } => PythonMessage { + kind: PythonMessageKind::Exception { rank }, + message: self.message, + }, + _ => panic!("PythonMessage is not a response but {:?}", self), } } } @@ -205,7 +224,7 @@ impl PythonMessage { impl std::fmt::Debug for PythonMessage { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("PythonMessage") - .field("method", &self.method) + .field("kind", &self.kind) .field( "message", &hyperactor::data::HexFmt(self.message.as_slice()).to_string(), @@ -216,48 +235,39 @@ impl std::fmt::Debug for PythonMessage { impl Unbind for PythonMessage { fn unbind(&self, bindings: &mut Bindings) -> anyhow::Result<()> { - self.response_port.unbind(bindings) + match &self.kind { + PythonMessageKind::CallMethod { response_port, .. } => response_port.unbind(bindings), + _ => Ok(()), + } } } impl Bind for PythonMessage { fn bind(&mut self, bindings: &mut Bindings) -> anyhow::Result<()> { - self.response_port.bind(bindings) + match &mut self.kind { + PythonMessageKind::CallMethod { response_port, .. } => response_port.bind(bindings), + _ => Ok(()), + } } } #[pymethods] impl PythonMessage { #[new] - #[pyo3(signature = (method, message, response_port, rank))] - pub fn new( - method: String, - message: &[u8], - response_port: Option, - rank: Option, - ) -> Self { - Self::new_from_buf(method, message.into(), response_port, rank) + #[pyo3(signature = (kind, message))] + pub fn new(kind: PythonMessageKind, message: &[u8]) -> Self { + PythonMessage::new_from_buf(kind, message.to_vec()) } #[getter] - fn method(&self) -> &String { - &self.method + fn kind(&self) -> PythonMessageKind { + self.kind.clone() } #[getter] fn message<'a>(&self, py: Python<'a>) -> Bound<'a, PyBytes> { PyBytes::new(py, self.message.as_ref()) } - - #[getter] - fn response_port(&self) -> Option { - self.response_port.clone() - } - - #[getter] - fn rank(&self) -> Option { - self.rank - } } #[pyclass(module = "monarch._rust_bindings.monarch_hyperactor.actor")] @@ -583,6 +593,7 @@ pub fn register_python_bindings(hyperactor_mod: &Bound<'_, PyModule>) -> PyResul hyperactor_mod.add_class::()?; hyperactor_mod.add_class::()?; hyperactor_mod.add_class::()?; + hyperactor_mod.add_class::()?; hyperactor_mod.add_class::()?; Ok(()) } diff --git a/monarch_hyperactor/src/mailbox.rs b/monarch_hyperactor/src/mailbox.rs index f8321b1f..7acbb375 100644 --- a/monarch_hyperactor/src/mailbox.rs +++ b/monarch_hyperactor/src/mailbox.rs @@ -44,6 +44,7 @@ use serde::Deserialize; use serde::Serialize; use crate::actor::PythonMessage; +use crate::actor::PythonMessageKind; use crate::proc::PyActorId; use crate::runtime::signal_safe_block_on; use crate::shape::PyShape; @@ -528,7 +529,8 @@ impl PythonOncePortReceiver { Named, PartialEq, FromPyObject, - IntoPyObject + IntoPyObject, + Debug )] pub enum EitherPortRef { Unbounded(PythonPortRef), @@ -610,7 +612,7 @@ impl Accumulator for PythonAccumulator { fn accumulate(&self, state: &mut Self::State, update: Self::Update) -> anyhow::Result<()> { Python::with_gil(|py: Python<'_>| { // Initialize state if it is empty. - if state.message.is_empty() && state.method.is_empty() { + if matches!(state.kind, PythonMessageKind::Uninit {}) { *state = self .accumulator .getattr(py, "initial_state")? diff --git a/monarch_tensor_worker/src/stream.rs b/monarch_tensor_worker/src/stream.rs index 023453ef..c7031ae1 100644 --- a/monarch_tensor_worker/src/stream.rs +++ b/monarch_tensor_worker/src/stream.rs @@ -39,6 +39,7 @@ use hyperactor::proc::Proc; use monarch_hyperactor::actor::LocalPythonMessage; use monarch_hyperactor::actor::PythonActor; use monarch_hyperactor::actor::PythonMessage; +use monarch_hyperactor::actor::PythonMessageKind; use monarch_hyperactor::mailbox::EitherPortRef; use monarch_messages::controller::ControllerMessageClient; use monarch_messages::controller::Seq; @@ -1009,10 +1010,10 @@ impl StreamActor { .extract() .unwrap(); Ok(PythonMessage::new_from_buf( - "result".to_string(), + PythonMessageKind::Result { + rank: Some(worker_actor_id.rank()), + }, data, - None, - Some(worker_actor_id.rank()), )) }) }); @@ -1686,12 +1687,13 @@ impl StreamMessageHandler for StreamActor { let (send, recv) = cx.open_once_port(); let send = send.bind(); let send = EitherPortRef::Once(send.into()); - let message = PythonMessage { - method: params.method, - message: params.args_kwargs_tuple.into(), - response_port: Some(send), - rank: None, - }; + let message = PythonMessage::new_from_buf( + PythonMessageKind::CallMethod { + name: params.method, + response_port: Some(send), + }, + params.args_kwargs_tuple.into(), + ); let local_state: Result> = Python::with_gil(|py| { params @@ -1713,31 +1715,44 @@ impl StreamMessageHandler for StreamActor { local_state: Some(local_state?), }; actor_handle.send(message)?; - let result = recv.recv().await?.with_rank(worker_actor_id.rank()); - if result.method == "exception" { - // If result has "exception" as its kind, then - // we need to unpickle and turn it into a WorkerError - // and call remote_function_failed otherwise the - // controller assumes the object is correct and doesn't handle - // dependency tracking correctly. - let err = Python::with_gil(|py| -> Result { - let err = py - .import("pickle") - .unwrap() - .call_method1("loads", (result.message.into_vec(),))?; - Ok(WorkerError { - worker_actor_id, - backtrace: err.to_string(), - }) - })?; - self.controller_actor - .remote_function_failed(cx, params.seq, err) - .await?; - } else { - let result = Serialized::serialize(&result).unwrap(); - self.controller_actor - .fetch_result(cx, params.seq, Ok(result)) - .await?; + let result = recv.recv().await?; + match result.kind { + PythonMessageKind::Exception { .. } => { + // If result has "exception" as its kind, then + // we need to unpickle and turn it into a WorkerError + // and call remote_function_failed otherwise the + // controller assumes the object is correct and doesn't handle + // dependency tracking correctly. + let err = Python::with_gil(|py| -> Result { + let err = py + .import("pickle") + .unwrap() + .call_method1("loads", (result.message,))?; + Ok(WorkerError { + worker_actor_id, + backtrace: err.to_string(), + }) + })?; + self.controller_actor + .remote_function_failed(cx, params.seq, err) + .await?; + } + PythonMessageKind::Result { .. } => { + let result = PythonMessage::new_from_buf( + PythonMessageKind::Result { + rank: Some(worker_actor_id.rank()), + }, + result.message, + ); + let result = Serialized::serialize(&result).unwrap(); + self.controller_actor + .fetch_result(cx, params.seq, Ok(result)) + .await?; + } + _ => panic!( + "Unexpected response kind from PythonActor: {:?}", + result.kind + ), } Ok(()) } diff --git a/python/monarch/_rust_bindings/monarch_hyperactor/actor.pyi b/python/monarch/_rust_bindings/monarch_hyperactor/actor.pyi index 12f84386..c6cedd71 100644 --- a/python/monarch/_rust_bindings/monarch_hyperactor/actor.pyi +++ b/python/monarch/_rust_bindings/monarch_hyperactor/actor.pyi @@ -8,7 +8,7 @@ import abc -from typing import final, List, Optional, Protocol +from typing import Any, final, List, Optional, Protocol, Type from monarch._rust_bindings.monarch_hyperactor.mailbox import ( Mailbox, @@ -96,39 +96,64 @@ class PickledMessageClientActor: """The actor id of the actor.""" ... +class PythonMessageKind: + @classmethod + @property + def Result(cls) -> "Type[Result]": ... + @classmethod + @property + def Exception(cls) -> "Type[Exception]": ... + @classmethod + @property + def CallMethod(cls) -> "Type[CallMethod]": ... + @classmethod + @property + def Uninit(cls) -> "Type[Uninit]": ... + +class Result(PythonMessageKind): + def __init__(self, rank: Optional[int]) -> None: ... + @property + def rank(self) -> int | None: ... + +class Exception(PythonMessageKind): + def __init__(self, rank: Optional[int]) -> None: ... + @property + def rank(self) -> int | None: ... + +class CallMethod(PythonMessageKind): + def __init__( + self, name: str, response_port: PortRef | OncePortRef | None + ) -> None: ... + @property + def name(self) -> str: ... + @property + def response_port(self) -> PortRef | OncePortRef | None: ... + +class Init(PythonMessageKind): + def __init__(self, response_port: PortRef | OncePortRef | None) -> None: ... + @property + def response_port(self) -> PortRef | OncePortRef | None: ... + +class Uninit(PythonMessageKind): + pass + @final class PythonMessage: """ A message that carries a python method and a pickled message that contains the arguments to the method. """ - def __init__( self, - method: str, + kind: PythonMessageKind, message: bytes, - response_port: PortRef | OncePortRef | None, - rank: int | None, ) -> None: ... - @property - def method(self) -> str: - """The method of the message.""" - ... - @property def message(self) -> bytes: """The pickled arguments.""" ... - - @property - def response_port(self) -> PortRef | OncePortRef | None: - """The response port.""" - ... - @property - def rank(self) -> Optional[int]: - """If this message is a response, the rank of the actor in the original broadcast that send the request.""" - ... + def kind(self) -> PythonMessageKind: ... class UndeliverableMessageEnvelope: """ @@ -185,4 +210,5 @@ class Actor(Protocol): shape: Shape, message: PythonMessage, panic_flag: PanicFlag, + local_state: List[Any] | None, ) -> None: ... diff --git a/python/monarch/_src/actor/actor_mesh.py b/python/monarch/_src/actor/actor_mesh.py index bbff1b5e..99a912ce 100644 --- a/python/monarch/_src/actor/actor_mesh.py +++ b/python/monarch/_src/actor/actor_mesh.py @@ -45,7 +45,11 @@ TypeVar, ) -from monarch._rust_bindings.monarch_hyperactor.actor import PanicFlag, PythonMessage +from monarch._rust_bindings.monarch_hyperactor.actor import ( + PanicFlag, + PythonMessage, + PythonMessageKind, +) from monarch._rust_bindings.monarch_hyperactor.actor_mesh import PythonActorMesh from monarch._rust_bindings.monarch_hyperactor.mailbox import ( Mailbox, @@ -395,10 +399,10 @@ def _send( refs = [obj for obj in objects if hasattr(obj, "__monarch_ref__")] if not refs: message = PythonMessage( - self._name, + PythonMessageKind.CallMethod( + self._name, None if port is None else port._port_ref + ), bytes, - None if port is None else port._port_ref, - None, ) self._actor_mesh.cast(message, selection) else: @@ -523,16 +527,31 @@ def endpoint(method): class Port(Generic[R]): def __init__( - self, port_ref: PortRef | OncePortRef, mailbox: Mailbox, rank: Optional[int] + self, + port_ref: PortRef | OncePortRef | None, + mailbox: Mailbox, + rank: Optional[int], ) -> None: self._port_ref = port_ref self._mailbox = mailbox self._rank = rank - def send(self, method: str, obj: R) -> None: + def send(self, obj: R) -> None: + if self._port_ref is None: + return + self._port_ref.send( + self._mailbox, + PythonMessage(PythonMessageKind.Result(self._rank), _pickle(obj)), + ) + + def exception(self, obj: Exception) -> None: + # we deliver each error exactly once, so if there is no port to respond to, + # the error is sent to the current actor as an exception. + if self._port_ref is None: + raise obj from None self._port_ref.send( self._mailbox, - PythonMessage(method, _pickle(obj), None, self._rank), + PythonMessage(PythonMessageKind.Exception(self._rank), _pickle(obj)), ) @@ -601,12 +620,13 @@ def _blocking_recv(self) -> R: def _process(self, msg: PythonMessage) -> R: # TODO: Try to do something more structured than a cast here payload = cast(R, unflatten(msg.message, itertools.repeat(self._mailbox))) - if msg.method == "result": - return payload - else: - assert msg.method == "exception" - # pyre-ignore - raise payload + match msg.kind: + case PythonMessageKind.Result(): + return payload + case PythonMessageKind.Exception(): + raise cast(Exception, payload) + case _: + raise ValueError(f"Unexpected message kind: {msg.kind}") def recv(self) -> "Future[R]": return Future(lambda: self._recv(), self._blocking_recv) @@ -614,9 +634,12 @@ def recv(self) -> "Future[R]": class RankedPortReceiver(PortReceiver[Tuple[int, R]]): def _process(self, msg: PythonMessage) -> Tuple[int, R]: - if msg.rank is None: - raise ValueError("RankedPort receiver got a message without a rank") - return msg.rank, super()._process(msg) + rank = getattr(msg.kind, "rank", None) + if rank is None: + raise ValueError( + f"RankedPort receiver got a message without a rank {msg}", + ) + return rank, super()._process(msg) singleton_shape = Shape([], NDSlice(offset=0, sizes=[], strides=[])) @@ -651,11 +674,15 @@ async def handle( ) -> None: if local_state is None: local_state = itertools.repeat(mailbox) - port = ( - Port(message.response_port, mailbox, rank) - if message.response_port - else None - ) + + match message.kind: + case PythonMessageKind.CallMethod(response_port=response_port): + pass + case _: + response_port = None + # response_port can be None. If so, then sending to port will drop the response, + # and raise any exceptions to the caller. + port = Port(response_port, mailbox, rank) try: ctx: MonarchContext = MonarchContext( mailbox, mailbox.actor_id.proc_id, Point(rank, shape) @@ -666,12 +693,16 @@ async def handle( args, kwargs = unflatten(message.message, local_state) - if message.method == "__init__": - Class, *args = args - self.instance = Class(*args, **kwargs) - if port is not None: - port.send("result", None) - return None + match message.kind: + case PythonMessageKind.CallMethod(name=name): + method = name + if method == "__init__": + Class, *args = args + self.instance = Class(*args, **kwargs) + port.send(None) + return None + case _: + raise ValueError(f"Unexpected message kind: {message.kind}") if self.instance is None: # This could happen because of the following reasons. Both @@ -686,18 +717,18 @@ async def handle( # mixed the usage of cast and direct send. raise AssertionError( f""" - actor object is missing when executing method {message.method} + actor object is missing when executing method {method} on actor {mailbox.actor_id} """ ) - the_method = getattr(self.instance, message.method)._method + the_method = getattr(self.instance, method)._method if inspect.iscoroutinefunction(the_method): async def instrumented(): enter_span( the_method.__module__, - message.method, + method, str(ctx.mailbox.actor_id), ) try: @@ -714,26 +745,16 @@ async def instrumented(): result = await instrumented() else: - enter_span( - the_method.__module__, message.method, str(ctx.mailbox.actor_id) - ) + enter_span(the_method.__module__, method, str(ctx.mailbox.actor_id)) result = the_method(self.instance, *args, **kwargs) self._maybe_exit_debugger() exit_span() - if port is not None: - port.send("result", result) + port.send(result) except Exception as e: self._post_mortem_debug(e.__traceback__) traceback.print_exc() - s = ActorError(e) - - # The exception is delivered to exactly one of: - # (1) our caller, (2) our supervisor - if port is not None: - port.send("exception", s) - else: - raise s from None + port.exception(ActorError(e)) except BaseException as e: self._post_mortem_debug(e.__traceback__) # A BaseException can be thrown in the case of a Rust panic. diff --git a/python/monarch/mesh_controller.py b/python/monarch/mesh_controller.py index 5c4c472d..0f550f16 100644 --- a/python/monarch/mesh_controller.py +++ b/python/monarch/mesh_controller.py @@ -181,6 +181,7 @@ def shutdown( self._shutdown = True sender, receiver = PortTuple.create(self._mesh_controller._mailbox, once=True) + assert sender._port_ref is not None self._mesh_controller.sync_at_exit(sender._port_ref.port_id) receiver.recv().get(timeout=60) # we are not expecting anything more now, because we already @@ -205,6 +206,7 @@ def new_node_nocoalesce( if future is not None: # method annotation is a lie to make Client happy port, slice = cast("Tuple[Port[Any], NDSlice]", future) + assert port._port_ref is not None response_port = (port._port_ref.port_id, slice) self._mesh_controller.node(seq, defs, uses, response_port, tracebacks) return seq diff --git a/python/tests/_monarch/test_actor.py b/python/tests/_monarch/test_actor.py index f2d249d6..31c06519 100644 --- a/python/tests/_monarch/test_actor.py +++ b/python/tests/_monarch/test_actor.py @@ -8,7 +8,10 @@ import time -from monarch._rust_bindings.monarch_hyperactor.actor import PythonMessage +from monarch._rust_bindings.monarch_hyperactor.actor import ( + PythonMessage, + PythonMessageKind, +) def test_python_message() -> None: @@ -19,6 +22,6 @@ def test_python_message() -> None: payload: str = "a" * 2**30 # 1gb blob: bytes = payload.encode("utf-8") t = time.time() - PythonMessage(method, blob, None, None) + PythonMessage(PythonMessageKind.CallMethod(method, None), blob) t_spent = time.time() - t assert t_spent < 1 diff --git a/python/tests/_monarch/test_mailbox.py b/python/tests/_monarch/test_mailbox.py index 949f2f47..cd1468f5 100644 --- a/python/tests/_monarch/test_mailbox.py +++ b/python/tests/_monarch/test_mailbox.py @@ -8,11 +8,19 @@ import asyncio import pickle -from typing import Any, Callable, cast, final, Generic, List, TypeVar +from typing import Any, Callable, cast, final, Generic, List, TYPE_CHECKING, TypeVar import monarch -from monarch._rust_bindings.monarch_hyperactor.actor import PanicFlag, PythonMessage +from monarch._rust_bindings.monarch_hyperactor.actor import ( + PanicFlag, + PythonMessage, + PythonMessageKind, +) + +if TYPE_CHECKING: + from monarch._rust_bindings.monarch_hyperactor.actor import CallMethod + from monarch._rust_bindings.monarch_hyperactor.alloc import AllocConstraints, AllocSpec @@ -41,7 +49,7 @@ def __call__(self, left: PythonMessage, right: PythonMessage) -> PythonMessage: l: U = cast(U, pickle.loads(left.message)) r: U = cast(U, pickle.loads(right.message)) result: U = self._reduce_f(l, r) - return PythonMessage(left.method, pickle.dumps(result), None, None) + return PythonMessage(left.kind, pickle.dumps(result)) @final @@ -62,12 +70,13 @@ def __call__(self, state: PythonMessage, update: PythonMessage) -> PythonMessage s: S = cast(S, pickle.loads(state.message)) u: U = cast(U, pickle.loads(update.message)) result: S = self._accumulate_f(s, u) - return PythonMessage(state.method, pickle.dumps(result), None, None) + return PythonMessage(state.kind, pickle.dumps(result)) @property def initial_state(self) -> PythonMessage: return PythonMessage( - " @Accumulator.initial_state", pickle.dumps(self._initial_state), None, None + PythonMessageKind.CallMethod(" @Accumulator.initial_state", None), + pickle.dumps(self._initial_state), ) @property @@ -97,7 +106,11 @@ def my_accumulate(state: str, update: int) -> str: def post_message(value: int) -> None: port_ref.send( - mailbox, PythonMessage("test_accumulator", pickle.dumps(value), None, None) + mailbox, + PythonMessage( + PythonMessageKind.CallMethod("test_accumulator", None), + pickle.dumps(value), + ), ) async def recv_message() -> str: @@ -126,12 +139,19 @@ async def handle( panic_flag: PanicFlag, local_state: List[Any] | None, ) -> None: - assert message.response_port is not None - reply_port = message.response_port - reply_port.send(mailbox, PythonMessage("echo", message.message, None, None)) + call_method = cast("CallMethod", message.kind) + assert call_method.response_port is not None + reply_port = call_method.response_port + reply_port.send( + mailbox, + PythonMessage(PythonMessageKind.CallMethod("echo", None), message.message), + ) for i in range(100): reply_port.send( - mailbox, PythonMessage("echo", pickle.dumps(f"msg{i}"), None, None) + mailbox, + PythonMessage( + PythonMessageKind.CallMethod("echo", None), pickle.dumps(f"msg{i}") + ), ) @@ -152,7 +172,9 @@ def my_reduce(state: str, update: str) -> str: actor_mesh.cast( Selection.from_string("*"), - PythonMessage("echo", pickle.dumps("start"), port_ref, None), + PythonMessage( + PythonMessageKind.CallMethod("echo", port_ref), pickle.dumps("start") + ), ) messge = await asyncio.wait_for(receiver.recv(), timeout=5)