Skip to content

Commit dc93f88

Browse files
authored
[Ray] Use main pool as owner when autoscale disabled (#2878)
1 parent cfb23ef commit dc93f88

File tree

2 files changed

+25
-14
lines changed

2 files changed

+25
-14
lines changed

mars/services/storage/core.py

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -571,17 +571,6 @@ async def _setup_storage(
571571
):
572572
backend = get_storage_backend(storage_backend)
573573
storage_config = storage_config or dict()
574-
575-
from ..cluster import ClusterAPI
576-
577-
if backend.name == "ray":
578-
try:
579-
cluster_api = await ClusterAPI.create(self.address)
580-
supervisor_address = (await cluster_api.get_supervisors())[0]
581-
# ray storage backend need to set supervisor as owner to avoid data lost when worker dies.
582-
storage_config["owner"] = supervisor_address
583-
except mo.ActorNotExist:
584-
pass
585574
init_params, teardown_params = await backend.setup(**storage_config)
586575
client = backend(**init_params)
587576
self._init_params[band_name][storage_backend] = init_params

mars/services/storage/worker/service.py

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,31 @@ async def start(self):
3636
backends = storage_configs.get("backends")
3737
options = storage_configs.get("default_config", dict())
3838
transfer_block_size = options.get("transfer_block_size", None)
39-
backend_config = {
40-
backend: storage_configs.get(backend, dict()) for backend in backends
41-
}
39+
backend_config = {}
40+
for backend in backends:
41+
storage_config = storage_configs.get(backend, dict())
42+
backend_config[backend] = storage_config
43+
if backend == "ray":
44+
# Specify supervisor as ray owner will be costly when mars do shuffle which there will be m*n objects
45+
# need to specify supervisor as owner, so enable it only for auto scale to avoid data lost when scale
46+
# in. This limit can be removed when ray support ownership transfer.
47+
if (
48+
self._config.get("scheduling", {})
49+
.get("autoscale", {})
50+
.get("enabled", False)
51+
):
52+
try:
53+
from ...cluster.api import ClusterAPI
54+
55+
cluster_api = await ClusterAPI.create(self._address)
56+
supervisor_address = (await cluster_api.get_supervisors())[0]
57+
# ray storage backend need to set supervisor as owner to avoid data lost when worker dies.
58+
owner = supervisor_address
59+
except mo.ActorNotExist:
60+
owner = self._address
61+
else:
62+
owner = self._address
63+
storage_config["owner"] = owner
4264

4365
await mo.create_actor(
4466
StorageManagerActor,

0 commit comments

Comments
 (0)