Skip to content

Commit 65d84b9

Browse files
authored
Add initial implementation of scheduling service (#2111)
1 parent 6b8eec1 commit 65d84b9

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

71 files changed

+3537
-567
lines changed

.github/workflows/run-tests.sh

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@ if [ -z "$NO_COMMON_TESTS" ]; then
2121
pytest $PYTEST_CONFIG --cov-config .coveragerc-threaded mars/tensor mars/dataframe mars/web \
2222
mars/learn mars/remote mars/storage mars/lib
2323
mv .coverage build/.coverage.tensor.file
24+
pytest $PYTEST_CONFIG --cov-config .coveragerc-threaded --forked mars/oscar mars/services
25+
mv .coverage build/.coverage.oscar.file
2426
pytest $PYTEST_CONFIG --cov-config .coveragerc --forked --ignore mars/tensor --ignore mars/dataframe \
25-
--ignore mars/learn --ignore mars/remote mars
27+
--ignore mars/learn --ignore mars/remote --ignore mars/services --ignore mars/oscar mars
2628
mv .coverage build/.coverage.main.file
2729
coverage combine build/ && coverage report
2830

mars/core/graph/core.pyx

Lines changed: 29 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,9 @@ cdef class DirectedGraph:
310310
return f'"{val}"'
311311
return val
312312

313+
def _extract_operands(self, node):
314+
return [node.op]
315+
313316
def to_dot(self, graph_attrs=None, node_attrs=None, trunc_key=5, result_chunk_keys=None):
314317
sio = StringIO()
315318
sio.write('digraph {\n')
@@ -328,32 +331,32 @@ cdef class DirectedGraph:
328331

329332
visited = set()
330333
for node in self.iter_nodes():
331-
op = node.op
332-
op_name = type(op).__name__
333-
if op.stage is not None:
334-
op_name = f'{op_name}:{op.stage.name}'
335-
if op.key in visited:
336-
continue
337-
for input_chunk in (op.inputs or []):
338-
if input_chunk.key not in visited:
339-
sio.write(f'"Chunk:{input_chunk.key[:trunc_key]}" {chunk_style}\n')
340-
visited.add(input_chunk.key)
341-
if op.key not in visited:
342-
sio.write(f'"{op_name}:{op.key[:trunc_key]}" {operand_style}\n')
343-
visited.add(op.key)
344-
sio.write(f'"Chunk:{input_chunk.key[:trunc_key]}" -> "{op_name}:{op.key[:5]}"\n')
345-
346-
for output_chunk in (op.outputs or []):
347-
if output_chunk.key not in visited:
348-
tmp_chunk_style = chunk_style
349-
if result_chunk_keys and output_chunk.key in result_chunk_keys:
350-
tmp_chunk_style = '[shape=box,style=filled,fillcolor=cadetblue1]'
351-
sio.write(f'"Chunk:{output_chunk.key[:trunc_key]}" {tmp_chunk_style}\n')
352-
visited.add(output_chunk.key)
353-
if op.key not in visited:
354-
sio.write(f'"{op_name}:{op.key[:trunc_key]}" {operand_style}\n')
355-
visited.add(op.key)
356-
sio.write(f'"{op_name}:{op.key[:trunc_key]}" -> "Chunk:{output_chunk.key[:5]}"\n')
334+
for op in self._extract_operands(node):
335+
op_name = type(op).__name__
336+
if op.stage is not None:
337+
op_name = f'{op_name}:{op.stage.name}'
338+
if op.key in visited:
339+
continue
340+
for input_chunk in (op.inputs or []):
341+
if input_chunk.key not in visited:
342+
sio.write(f'"Chunk:{input_chunk.key[:trunc_key]}" {chunk_style}\n')
343+
visited.add(input_chunk.key)
344+
if op.key not in visited:
345+
sio.write(f'"{op_name}:{op.key[:trunc_key]}" {operand_style}\n')
346+
visited.add(op.key)
347+
sio.write(f'"Chunk:{input_chunk.key[:trunc_key]}" -> "{op_name}:{op.key[:5]}"\n')
348+
349+
for output_chunk in (op.outputs or []):
350+
if output_chunk.key not in visited:
351+
tmp_chunk_style = chunk_style
352+
if result_chunk_keys and output_chunk.key in result_chunk_keys:
353+
tmp_chunk_style = '[shape=box,style=filled,fillcolor=cadetblue1]'
354+
sio.write(f'"Chunk:{output_chunk.key[:trunc_key]}" {tmp_chunk_style}\n')
355+
visited.add(output_chunk.key)
356+
if op.key not in visited:
357+
sio.write(f'"{op_name}:{op.key[:trunc_key]}" {operand_style}\n')
358+
visited.add(op.key)
359+
sio.write(f'"{op_name}:{op.key[:trunc_key]}" -> "Chunk:{output_chunk.key[:5]}"\n')
357360

358361
sio.write('}')
359362
return sio.getvalue()

mars/deploy/oscar/base_config.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ services:
55
- meta
66
- lifecycle
77
- task
8+
- scheduling
9+
- subtask
810
- web
911
cluster:
1012
backend: fixed
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
inherits: '@default'
22
task:
33
task_processor_cls: mars.services.task.supervisor.tests.CheckedTaskProcessor
4-
subtask_processor_cls: mars.services.task.worker.tests.CheckedSubtaskProcessor
4+
subtask:
5+
subtask_processor_cls: mars.services.subtask.worker.tests.CheckedSubtaskProcessor

mars/deploy/oscar/tests/test_checked_session.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
from mars.deploy.oscar.service import _load_config
2424
from mars.deploy.oscar.tests.session import new_test_session, CONFIG_FILE
2525
from mars.services.task.supervisor.tests import CheckedTaskProcessor
26-
from mars.services.task.worker.tests import CheckedSubtaskProcessor
26+
from mars.services.subtask.worker.tests import CheckedSubtaskProcessor
2727

2828

2929
class FakeCheckedTaskProcessor(CheckedTaskProcessor):
@@ -81,7 +81,7 @@ def test_check_task_processor(setup):
8181

8282
def test_check_subtask_processor(setup):
8383
config = _load_config(CONFIG_FILE)
84-
config['task']['subtask_processor_cls'] = \
84+
config['subtask']['subtask_processor_cls'] = \
8585
'mars.deploy.oscar.tests.' \
8686
'test_checked_session.FakeCheckedSubtaskProcessor'
8787

mars/deploy/oscar/tests/test_ray.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@
1818
import mars.tensor as mt
1919
from mars.core.session import get_default_session, new_session
2020
from mars.deploy.oscar.ray import new_cluster, _load_config
21+
from mars.deploy.oscar.tests import test_local
2122
from mars.serialization.ray import register_ray_serializers
2223
from mars.tests.core import require_ray
23-
from ....utils import lazy_import
24-
from . import test_local
24+
from mars.utils import lazy_import
2525

2626
ray = lazy_import('ray')
2727

mars/lib/aio/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414

1515
import asyncio
16+
import contextlib
1617
import sys
1718

1819
from .file import AioFileObject, AioFilesystem
@@ -28,6 +29,10 @@
2829
asyncio.get_running_loop = get_running_loop
2930
asyncio.create_task = asyncio.ensure_future
3031

32+
# patch async generator
33+
from async_generator import asynccontextmanager
34+
contextlib.asynccontextmanager = asynccontextmanager
35+
3136

3237
def create_lock(loop=None):
3338
async def _create_lock():

mars/oscar/backends/core.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ async def _listen(self, client: Client):
5555
except (ConnectionError, BrokenPipeError):
5656
# close failed, ignore it
5757
pass
58-
raise ServerClosed(f'Remote server {client.dest_address} closed')
58+
raise ServerClosed(f'Remote server {client.dest_address} closed') from None
5959
future = self._client_to_message_futures[client].pop(message.message_id)
6060
future.set_result(message)
6161
except DeserializeMessageFailed as e: # pragma: no cover

mars/oscar/backends/pool.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -828,7 +828,7 @@ async def handle_control_command(self,
828828
processor.result = await super().handle_control_command(message)
829829
elif message.control_message_type == ControlMessageType.stop:
830830
timeout, force = message.content if message.content is not None \
831-
else (None, None)
831+
else (None, False)
832832
await self.stop_sub_pool(
833833
message.address,
834834
self.sub_processes[message.address],

mars/services/cluster/api/oscar.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ async def get_nodes_info(self, nodes: List[str] = None, role: NodeRole = None,
147147
return await self._node_info_ref.get_nodes_info(
148148
nodes=nodes, role=role, env=env, resource=resource, state=state)
149149

150-
async def get_all_bands(self) -> Dict[BandType, int]:
150+
async def get_all_bands(self, role: NodeRole = None, watch: bool = False) -> Dict[BandType, int]:
151151
"""
152152
Get all bands that can be used for computation.
153153
@@ -156,7 +156,9 @@ async def get_all_bands(self) -> Dict[BandType, int]:
156156
band_to_slots : dict
157157
Band to n_slot.
158158
"""
159-
return await self._node_info_ref.get_all_bands()
159+
if watch:
160+
return await self._node_info_ref.watch_all_bands(role)
161+
return await self._node_info_ref.get_all_bands(role)
160162

161163
async def get_bands(self):
162164
"""
@@ -226,6 +228,7 @@ async def create(cls: Type[APIType], address: str, **kw) -> APIType:
226228
address=address),
227229
mo.create_actor(NodeInfoUploaderActor, NodeRole.WORKER,
228230
interval=kw.get('upload_interval'),
231+
band_to_slots=kw.get('band_to_slots'),
229232
use_gpu=kw.get('use_gpu', False),
230233
uid=NodeInfoUploaderActor.default_uid(),
231234
address=address),

mars/services/cluster/locator.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,16 @@
1313
# limitations under the License.
1414

1515
import asyncio
16+
import logging
1617
from typing import List, Optional
1718

1819
from mars import oscar as mo
1920
from mars.lib.uhashring import HashRing
2021
from mars.services.cluster.backends import AbstractClusterBackend, get_cluster_backend
2122
from mars.utils import extensible
2223

24+
logger = logging.getLogger(__name__)
25+
2326

2427
class SupervisorLocatorActor(mo.Actor):
2528
_backend: Optional[AbstractClusterBackend]

mars/services/cluster/supervisor/node_info.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,9 @@ def get_nodes_info(self, nodes: List[str] = None, role: NodeRole = None,
108108
)
109109
return ret_infos
110110

111-
def get_all_bands(self) -> Dict[BandType, int]:
112-
nodes = self._role_to_nodes.get(NodeRole.WORKER, list())
111+
def get_all_bands(self, role: NodeRole = None) -> Dict[BandType, int]:
112+
role = role or NodeRole.WORKER
113+
nodes = self._role_to_nodes.get(role, [])
113114
band_slots = dict()
114115
for node in nodes:
115116
node_resource = self._node_infos[node].resource
@@ -136,3 +137,17 @@ async def waiter():
136137
self._role_to_events[role].remove(event)
137138

138139
return waiter()
140+
141+
async def watch_all_bands(self, role: NodeRole = None):
142+
role = role or NodeRole.WORKER
143+
event = asyncio.Event()
144+
self._role_to_events[role].add(event)
145+
146+
async def waiter():
147+
try:
148+
await event.wait()
149+
return self.get_all_bands(role=role)
150+
finally:
151+
self._role_to_events[role].remove(event)
152+
153+
return waiter()

mars/services/cluster/tests/test_api.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,13 +60,18 @@ async def test_api(actor_pool):
6060
assert pool_addr in nodes_info
6161
assert 'custom_usage' in nodes_info[pool_addr]['resource']['numa-0']
6262

63+
bands = await api.get_all_bands()
64+
assert (pool_addr, 'numa-0') in bands
65+
6366
with pytest.raises(asyncio.TimeoutError):
6467
await asyncio.wait_for(api.get_supervisors(watch=True), timeout=0.1)
6568
with pytest.raises(asyncio.TimeoutError):
6669
await asyncio.wait_for(api.get_supervisor_refs(
6770
[TestActor.default_uid()], watch=True), timeout=0.1)
6871
with pytest.raises(asyncio.TimeoutError):
6972
await asyncio.wait_for(api.watch_nodes(NodeRole.WORKER), timeout=0.1)
73+
with pytest.raises(asyncio.TimeoutError):
74+
await asyncio.wait_for(api.get_all_bands(watch=True), timeout=0.1)
7075

7176

7277
@pytest.mark.asyncio

mars/services/cluster/uploader.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ async def mark_node_ready(self):
6464
# upload info in time to reduce latency
6565
await self.upload_node_info(False)
6666

67+
def is_node_ready(self):
68+
return self._upload_enabled
69+
6770
async def upload_node_info(self, call_next: bool = True):
6871
try:
6972
if not self._info.env:

mars/services/meta/tests/test_api.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ async def test_meta_web_api():
9292

9393
async with pool:
9494
config = {
95-
"services": ["cluster", "session", "meta", "task", "web"],
95+
"services": ["cluster", "session", "meta", "web"],
9696
"cluster": {
9797
"backend": "fixed",
9898
"lookup_address": pool.external_address,

mars/services/meta/tests/test_service.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ async def test_meta_service():
2626

2727
async with pool:
2828
config = {
29-
"services": ["cluster", "session", "meta", "task"],
29+
"services": ["cluster", "session", "meta"],
3030
"cluster": {
3131
"backend": "fixed",
3232
"lookup_address": pool.external_address,

mars/services/scheduling/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,5 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14+
15+
from .api import MockSchedulingAPI, SchedulingAPI

0 commit comments

Comments
 (0)