Skip to content

[monarch] refactor monarch/ callsites to use monarch.actor #438

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions examples/grpo_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@
import torch
import torch.nn as nn
import torch.optim as optim
from monarch.actor_mesh import Actor, ActorMeshRef, endpoint

from monarch.proc_mesh import proc_mesh
from monarch.actor import Actor, ActorMeshRef, endpoint, proc_mesh
from monarch.rdma import RDMABuffer
from torch.distributions import Categorical, kl_divergence

Expand Down
10 changes: 5 additions & 5 deletions examples/notebooks/ping_pong.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"metadata": {},
"source": [
"## Hello World\n",
"Actors are spawned in Process meshes via the `monarch.proc_mesh` API. For those familiar with distributed systems, it can be helpful to think of each Actor as a server with endpoints that can be called."
"Actors are spawned in Process meshes via the `monarch.actor` API. For those familiar with distributed systems, it can be helpful to think of each Actor as a server with endpoints that can be called."
]
},
{
Expand All @@ -43,8 +43,8 @@
"source": [
"import asyncio\n",
"\n",
"from monarch.proc_mesh import proc_mesh, ProcMesh\n",
"from monarch.actor_mesh import Actor, endpoint, current_rank\n",
"from monarch.actor import proc_mesh, ProcMesh\n",
"from monarch.actor import Actor, endpoint, current_rank\n",
"\n",
"NUM_ACTORS=4\n",
"\n",
Expand Down Expand Up @@ -168,8 +168,8 @@
"source": [
"import asyncio\n",
"\n",
"from monarch.proc_mesh import proc_mesh, ProcMesh\n",
"from monarch.actor_mesh import Actor, endpoint, current_rank\n",
"from monarch.actor import proc_mesh, ProcMesh\n",
"from monarch.actor import Actor, endpoint, current_rank\n",
"\n",
"class ExampleActor(Actor):\n",
" def __init__(self, actor_name):\n",
Expand Down
4 changes: 2 additions & 2 deletions examples/notebooks/spmd_ddp.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
"import torch.nn as nn\n",
"import torch.optim as optim\n",
"\n",
"from monarch.proc_mesh import proc_mesh\n",
"from monarch.actor_mesh import Actor, current_rank, endpoint\n",
"from monarch.actor import proc_mesh\n",
"from monarch.actor import Actor, current_rank, endpoint\n",
"\n",
"from torch.nn.parallel import DistributedDataParallel as DDP\n",
"\n",
Expand Down
23 changes: 14 additions & 9 deletions python/monarch/_src/actor/proc_mesh.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
TypeVar,
)

from monarch._rust_bindings import has_tensor_engine

from monarch._rust_bindings.hyperactor_extension.alloc import ( # @manual=//monarch/monarch_extension:monarch_extension # @manual=//monarch/monarch_extension:monarch_extension
Alloc,
AllocConstraints,
Expand All @@ -44,11 +42,21 @@
from monarch._src.actor.future import Future
from monarch._src.actor.shape import MeshTrait

HAS_TENSOR_ENGINE = False
try:
# TODO: while the tensor_engine submodule doesn't exist yet, use the
# available of monarch.rdma as a proxy.
# type: ignore
from monarch.rdma import RDMAManager # @manual

HAS_TENSOR_ENGINE = True
except ImportError:
pass


if TYPE_CHECKING:
Tensor = Any
DeviceMesh = Any
RDMAManager = Any


T = TypeVar("T")
Expand Down Expand Up @@ -84,10 +92,7 @@ def __init__(
self._auto_reload_actor: Optional[AutoReloadActor] = None
self._maybe_device_mesh: Optional["DeviceMesh"] = _device_mesh
self._stopped = False
if _mock_shape is None and has_tensor_engine():
# type: ignore[21]
from monarch.rdma import RDMAManager # @manual

if _mock_shape is None and HAS_TENSOR_ENGINE:
# type: ignore[21]
self._rdma_manager = self._spawn_blocking("rdma_manager", RDMAManager)

Expand Down Expand Up @@ -192,7 +197,7 @@ async def _spawn_nonblocking(

@property
def _device_mesh(self) -> "DeviceMesh":
if not has_tensor_engine():
if not HAS_TENSOR_ENGINE:
raise RuntimeError(
"DeviceMesh is not available because tensor_engine was not compiled (USE_TENSOR_ENGINE=0)"
)
Expand Down Expand Up @@ -308,7 +313,7 @@ def local_proc_mesh(*, gpus: Optional[int] = None, hosts: int = 1) -> Future[Pro
)


_BOOTSTRAP_MAIN = "monarch.bootstrap_main"
_BOOTSTRAP_MAIN = "monarch._src.actor.bootstrap_main"


def _get_bootstrap_args() -> tuple[str, Optional[list[str]], dict[str, str]]:
Expand Down
2 changes: 1 addition & 1 deletion python/monarch/_testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@

import monarch_supervisor
from monarch._src.actor.shape import NDSlice
from monarch.actor import proc_mesh, ProcMesh
from monarch.common.client import Client
from monarch.common.device_mesh import DeviceMesh
from monarch.common.invocation import DeviceException, RemoteException
from monarch.controller.backend import ProcessBackend
from monarch.mesh_controller import spawn_tensor_engine
from monarch.proc_mesh import proc_mesh, ProcMesh
from monarch.python_local_mesh import PythonLocalContext
from monarch.rust_local_mesh import (
local_mesh,
Expand Down
13 changes: 10 additions & 3 deletions python/monarch/actor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,34 @@
Accumulator,
Actor,
ActorError,
ActorMeshRef,
current_actor_name,
current_rank,
current_size,
endpoint,
MonarchContext,
Point,
send,
ValueMesh,
)
from monarch._src.actor.future import Future
from monarch._src.actor.proc_mesh import proc_mesh, ProcMesh
from monarch._src.actor.proc_mesh import local_proc_mesh, proc_mesh, ProcMesh

__all__ = [
"Accumulator",
"Actor",
"ActorError",
"ActorMeshRef",
"current_actor_name",
"current_rank",
"current_size",
"endpoint",
"Future",
"local_proc_mesh",
"MonarchContext",
"ValueMesh",
"Point",
"proc_mesh",
"ProcMesh",
"Future",
"send",
"ValueMesh",
]
2 changes: 1 addition & 1 deletion python/monarch/mesh_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
from monarch._rust_bindings.monarch_hyperactor.proc import ( # @manual=//monarch/monarch_extension:monarch_extension
ActorId,
)
from monarch._src.actor.actor_mesh import Port, PortTuple
from monarch._src.actor.shape import NDSlice
from monarch.actor_mesh import Port, PortTuple
from monarch.common import messages
from monarch.common.controller_api import TController
from monarch.common.invocation import Seq
Expand Down
3 changes: 1 addition & 2 deletions python/tests/error_test_binary.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@

from monarch._rust_bindings.monarch_extension.panic import panicking_function

from monarch.actor_mesh import Actor, endpoint, send
from monarch.proc_mesh import proc_mesh
from monarch.actor import Actor, endpoint, proc_mesh, send


class ErrorActor(Actor):
Expand Down
3 changes: 1 addition & 2 deletions python/tests/test_actor_error.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@

import pytest
from monarch._rust_bindings.monarch_hyperactor.proc_mesh import ProcEvent
from monarch.actor_mesh import Actor, ActorError, endpoint, send
from monarch.proc_mesh import local_proc_mesh, proc_mesh
from monarch.actor import Actor, ActorError, endpoint, local_proc_mesh, proc_mesh


class ExceptionActor(Actor):
Expand Down
10 changes: 8 additions & 2 deletions python/tests/test_allocator.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,14 @@
StaticRemoteAllocInitializer,
TorchXRemoteAllocInitializer,
)
from monarch.actor_mesh import Actor, current_rank, current_size, endpoint, ValueMesh
from monarch.proc_mesh import ProcMesh
from monarch.actor import (
Actor,
current_rank,
current_size,
endpoint,
ProcMesh,
ValueMesh,
)
from monarch.tools.mesh_spec import MeshSpec, ServerSpec
from monarch.tools.network import get_sockaddr

Expand Down
5 changes: 2 additions & 3 deletions python/tests/test_python_actors.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
from types import ModuleType
from unittest.mock import AsyncMock, patch

import monarch

import pytest

import torch
Expand All @@ -23,6 +21,7 @@
from monarch.actor import (
Accumulator,
Actor,
ActorError,
current_actor_name,
current_rank,
current_size,
Expand Down Expand Up @@ -545,7 +544,7 @@ def _patch_output(msg):
breakpoints = await debug_client.list.call_one()
assert len(breakpoints) == 0

with pytest.raises(monarch.actor.ActorError, match="ValueError: bad rank"):
with pytest.raises(ActorError, match="ValueError: bad rank"):
await fut


Expand Down
2 changes: 1 addition & 1 deletion python/tests/test_tensor_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
import pytest
import torch
from monarch import remote
from monarch.actor import proc_mesh
from monarch.mesh_controller import spawn_tensor_engine
from monarch.proc_mesh import proc_mesh


two_gpu = pytest.mark.skipif(
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def run(self):
entry_points={
"console_scripts": [
"monarch=monarch.tools.cli:main",
"monarch_bootstrap=monarch.bootstrap_main:invoke_main",
"monarch_bootstrap=monarch._src.actor.bootstrap_main:invoke_main",
],
},
rust_extensions=rust_extensions,
Expand Down