Skip to content

Commit 7ee3006

Browse files
committed
FIX_BATCH
1 parent ff0e925 commit 7ee3006

File tree

14 files changed

+490
-307
lines changed

14 files changed

+490
-307
lines changed

mars/services/scheduling/api/oscar.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
from .... import oscar as mo
1818
from ....lib.aio import alru_cache
19-
from ...subtask import Subtask
19+
from ...subtask import Subtask, SubtaskResult
2020
from ..core import SubtaskScheduleSummary
2121
from .core import AbstractSchedulingAPI
2222

@@ -112,7 +112,7 @@ async def cancel_subtasks(
112112

113113
async def finish_subtasks(
114114
self,
115-
subtask_ids: List[str],
115+
subtask_results: List[SubtaskResult],
116116
bands: List[Tuple] = None,
117117
schedule_next: bool = True,
118118
):
@@ -129,7 +129,7 @@ async def finish_subtasks(
129129
schedule_next
130130
whether to schedule succeeding subtasks
131131
"""
132-
await self._manager_ref.finish_subtasks(subtask_ids, bands, schedule_next)
132+
await self._manager_ref.finish_subtasks(subtask_results, bands, schedule_next)
133133

134134

135135
class MockSchedulingAPI(SchedulingAPI):

mars/services/scheduling/supervisor/manager.py

Lines changed: 153 additions & 114 deletions
Large diffs are not rendered by default.

mars/services/scheduling/supervisor/queueing.py

Lines changed: 131 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from ....lib.aio import alru_cache
2525
from ....metrics import Metrics
2626
from ....resource import ZeroResource
27+
from ....typing import BandType
2728
from ....utils import dataslots
2829
from ...subtask import Subtask
2930
from ...task import TaskAPI
@@ -48,6 +49,7 @@ class SubtaskQueueingActor(mo.Actor):
4849
_stid_to_bands: DefaultDict[str, List[Tuple]]
4950
_stid_to_items: Dict[str, HeapItem]
5051
_band_queues: DefaultDict[Tuple, List[HeapItem]]
52+
_submit_requests: List[Optional[Dict[BandType, int]]]
5153

5254
@classmethod
5355
def gen_uid(cls, session_id: str):
@@ -61,6 +63,10 @@ def __init__(self, session_id: str, submit_period: Union[float, int] = None):
6163
# so that we can ensure band queue is busy if the band queue is not empty.
6264
self._band_queues = defaultdict(list)
6365

66+
self._submit_requests = []
67+
self._submit_request_event = asyncio.Event()
68+
self._submit_request_task = None
69+
6470
self._cluster_api = None
6571
self._slots_ref = None
6672
self._assigner_ref = None
@@ -69,7 +75,6 @@ def __init__(self, session_id: str, submit_period: Union[float, int] = None):
6975
self._band_watch_task = None
7076
self._max_enqueue_id = 0
7177

72-
self._periodical_submit_task = None
7378
self._submit_period = submit_period or _DEFAULT_SUBMIT_PERIOD
7479
self._submitted_subtask_number = Metrics.gauge(
7580
"mars.band.submitted_subtask_number",
@@ -133,23 +138,13 @@ async def watch_bands():
133138
AssignerActor.gen_uid(self._session_id), address=self.address
134139
)
135140

136-
if self._submit_period > 0:
137-
self._periodical_submit_task = self.ref().periodical_submit.tell_delay(
138-
delay=self._submit_period
139-
)
141+
self._submit_request_task = asyncio.create_task(self._submission_task_func())
140142

141143
async def __pre_destroy__(self):
142144
self._band_watch_task.cancel()
143-
if self._periodical_submit_task is not None: # pragma: no branch
144-
self._periodical_submit_task.cancel()
145-
146-
async def periodical_submit(self):
147-
await self.ref().submit_subtasks.tell()
148-
self._periodical_submit_task = self.ref().periodical_submit.tell_delay(
149-
delay=self._submit_period
150-
)
145+
if self._submit_request_task is not None: # pragma: no branch
146+
self._submit_request_task.cancel()
151147

152-
@alru_cache
153148
async def _get_task_api(self):
154149
return await TaskAPI.create(self._session_id, self.address)
155150

@@ -180,114 +175,171 @@ async def add_subtasks(
180175
self._max_enqueue_id += 1
181176
heapq.heappush(self._band_queues[band], heap_item)
182177
logger.debug(
183-
"Subtask %s enqueued to band %s excluded from %s.",
178+
"Subtask %s enqueued to band %s. exclude_bands=%s.",
184179
subtask.subtask_id,
185180
band,
186181
exclude_bands,
187182
)
188183
logger.debug("%d subtasks enqueued", len(subtasks))
189184

190-
async def submit_subtasks(self, band: Tuple = None, limit: Optional[int] = None):
191-
logger.debug("Submitting subtasks with limit %s", limit)
185+
def submit_subtasks(self, band_to_limit: Dict[BandType, int] = None):
186+
self._submit_requests.append(band_to_limit)
187+
self._submit_request_event.set()
188+
189+
async def _submission_task_func(self):
190+
while True:
191+
try:
192+
periodical_triggered = False
193+
if not self._submit_requests: # pragma: no branch
194+
try:
195+
if self._submit_period:
196+
await asyncio.wait_for(
197+
self._submit_request_event.wait(), self._submit_period
198+
)
199+
else:
200+
await self._submit_request_event.wait()
201+
202+
self._submit_request_event.clear()
203+
except asyncio.TimeoutError:
204+
periodical_triggered = True
205+
206+
requests = self._submit_requests
207+
self._submit_requests = []
208+
if not periodical_triggered and not requests: # pragma: no cover
209+
continue
210+
211+
merged_band_to_limit = dict()
212+
for req in requests:
213+
if req is None:
214+
merged_band_to_limit = None
215+
break
216+
merged_band_to_limit.update(req)
217+
await self._submit_subtask_request(merged_band_to_limit)
218+
except asyncio.CancelledError:
219+
break
220+
221+
async def _submit_subtask_request(self, band_to_limit: Dict[BandType, int] = None):
222+
if band_to_limit:
223+
logger.debug(
224+
"TMP_QUEUE_PROBE: Submitting subtasks with limits: %r", band_to_limit
225+
)
192226

193-
if not limit and band not in self._band_to_resource:
227+
if not self._band_to_resource or any(
228+
not limit and band not in self._band_to_resource
229+
for band, limit in band_to_limit or ()
230+
):
194231
self._band_to_resource = await self._cluster_api.get_all_bands()
195232

196-
bands = [band] if band is not None else list(self._band_to_resource.keys())
197-
submit_aio_tasks = []
198-
manager_ref = await self._get_manager_ref()
233+
if not band_to_limit:
234+
band_to_limit = {band: None for band in self._band_to_resource.keys()}
199235

200236
apply_delays = []
201237
submit_items_list = []
202238
submitted_bands = []
203239

204-
for band in bands:
205-
band_limit = limit or (
206-
self._band_to_resource[band].num_cpus
207-
or self._band_to_resource[band].num_gpus
208-
)
209-
task_queue = self._band_queues[band]
210-
submit_items = dict()
211-
while (
212-
self._ensure_top_item_valid(task_queue)
213-
and len(submit_items) < band_limit
214-
):
215-
item = heapq.heappop(task_queue)
216-
submit_items[item.subtask.subtask_id] = item
217-
218-
subtask_ids = list(submit_items)
219-
if not subtask_ids:
220-
continue
221-
222-
submitted_bands.append(band)
223-
submit_items_list.append(submit_items)
224-
225-
# Before hbo, when a manager finish a subtask, it will schedule one subtask successfully because
226-
# there is a slot idle. But now we have memory requirements, so the subtask may apply resource
227-
# from supervisor failed. In such cases, those subtasks will never got scheduled.
228-
# TODO We can use `_periodical_submit_task` to submit those subtasks.
229-
subtask_resources = [
230-
item.subtask.required_resource for item in submit_items.values()
231-
]
232-
apply_delays.append(
233-
self._slots_ref.apply_subtask_resources.delay(
234-
band, self._session_id, subtask_ids, subtask_resources
240+
def _load_items_to_submit():
241+
for band, limit in band_to_limit.items():
242+
band_limit = limit or (
243+
self._band_to_resource[band].num_cpus
244+
or self._band_to_resource[band].num_gpus
235245
)
236-
)
246+
task_queue = self._band_queues[band]
247+
submit_items = dict()
248+
while (
249+
self._ensure_top_item_valid(task_queue)
250+
and len(submit_items) < band_limit
251+
):
252+
item = heapq.heappop(task_queue)
253+
submit_items[item.subtask.subtask_id] = item
254+
255+
subtask_ids = list(submit_items)
256+
if not subtask_ids:
257+
continue
258+
259+
submitted_bands.append(band)
260+
submit_items_list.append(submit_items)
261+
262+
# Before hbo, when a manager finish a subtask, it will schedule one subtask successfully because
263+
# there is a slot idle. But now we have memory requirements, so the subtask may apply resource
264+
# from supervisor failed. In such cases, those subtasks will never got scheduled.
265+
# TODO We can use `_periodical_submit_task` to submit those subtasks.
266+
subtask_resources = [
267+
item.subtask.required_resource for item in submit_items.values()
268+
]
269+
apply_delays.append(
270+
self._slots_ref.apply_subtask_resources.delay(
271+
band, self._session_id, subtask_ids, subtask_resources
272+
)
273+
)
274+
275+
await asyncio.to_thread(_load_items_to_submit)
276+
277+
logger.debug("TMP_QUEUE_PROBE: Finished picking top subtasks")
237278

238279
async with redirect_subtask_errors(
239280
self,
240-
[
281+
(
241282
item.subtask
242283
for submit_items in submit_items_list
243284
for item in submit_items.values()
244-
],
285+
),
245286
):
246287
submitted_ids_list = await self._slots_ref.apply_subtask_resources.batch(
247288
*apply_delays
248289
)
249290

250-
for band, submit_items, submitted_ids in zip(
251-
submitted_bands, submit_items_list, submitted_ids_list
252-
):
253-
subtask_ids = list(submit_items)
254-
task_queue = self._band_queues[band]
291+
logger.debug(
292+
"TMP_QUEUE_PROBE: Finished band resource allocation, %d subtasks submitted",
293+
sum(len(ids) for ids in submitted_ids_list),
294+
)
255295

256-
async with redirect_subtask_errors(
257-
self, [item.subtask for item in submit_items.values()]
296+
manager_ref = await self._get_manager_ref()
297+
submit_delays = []
298+
299+
def _gather_submissions():
300+
for band, submit_items, submitted_ids in zip(
301+
submitted_bands, submit_items_list, submitted_ids_list
258302
):
259-
non_submitted_ids = [k for k in submit_items if k not in submitted_ids]
303+
subtask_ids = list(submit_items)
304+
task_queue = self._band_queues[band]
305+
submitted_id_set = set(submitted_ids)
306+
307+
non_submitted_ids = [
308+
k for k in submit_items if k not in submitted_id_set
309+
]
260310
tags = {
261311
"session_id": self._session_id,
262312
"band": band[0] if band else "",
263313
}
264314
self._submitted_subtask_number.record(len(submitted_ids), tags)
265315
self._unsubmitted_subtask_number.record(len(non_submitted_ids), tags)
266-
if submitted_ids:
316+
317+
if not submitted_ids:
318+
if non_submitted_ids:
319+
logger.debug("No slots available on band %s", band)
320+
else:
267321
for stid in subtask_ids:
268-
if stid not in submitted_ids:
322+
if stid not in submitted_id_set:
269323
continue
270324
item = submit_items[stid]
271325
logger.debug("Submit subtask %r to band %r", item.subtask, band)
272-
submit_aio_tasks.append(
273-
asyncio.create_task(
274-
manager_ref.submit_subtask_to_band.tell(
275-
item.subtask.subtask_id, band
276-
)
326+
submit_delays.append(
327+
manager_ref.submit_subtask_to_band.delay(
328+
item.subtask.subtask_id, band
277329
)
278330
)
279-
await asyncio.sleep(0)
280331
self.remove_queued_subtasks([item.subtask.subtask_id])
281-
else:
282-
logger.debug("No slots available")
283332

284-
for stid in non_submitted_ids:
285-
# TODO if subtasks submit failed due to lacking memory/cpu/gpu resources, lower the priority so that
286-
# other subtasks can be submitted.
287-
heapq.heappush(task_queue, submit_items[stid])
333+
for stid in non_submitted_ids:
334+
# TODO if subtasks submit failed due to lacking memory/cpu/gpu resources, lower the priority so that
335+
# other subtasks can be submitted.
336+
heapq.heappush(task_queue, submit_items[stid])
337+
338+
await asyncio.to_thread(_gather_submissions)
288339

289-
if submit_aio_tasks:
290-
yield asyncio.gather(*submit_aio_tasks)
340+
logger.debug("TMP_QUEUE_PROBE: Start subtask submission in batch")
341+
await manager_ref.submit_subtask_to_band.batch(*submit_delays)
342+
logger.debug("TMP_QUEUE_PROBE: Finished subtask submission")
291343

292344
def _ensure_top_item_valid(self, task_queue):
293345
"""Clean invalid subtask item from the queue to ensure that when the queue is not empty,

mars/services/scheduling/supervisor/tests/test_assigner.py

Lines changed: 25 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -62,32 +62,31 @@ def get_all_bands(self, role=None, statuses=None):
6262
class FakeClusterAPI(ClusterAPI):
6363
@classmethod
6464
async def create(cls, address: str, **kw):
65-
dones, _ = await asyncio.wait(
66-
[
67-
mo.create_actor(
68-
SupervisorPeerLocatorActor,
69-
"fixed",
70-
address,
71-
uid=SupervisorPeerLocatorActor.default_uid(),
72-
address=address,
73-
),
74-
mo.create_actor(
75-
MockNodeInfoCollectorActor,
76-
with_gpu=kw.get("with_gpu", False),
77-
uid=NodeInfoCollectorActor.default_uid(),
78-
address=address,
79-
),
80-
mo.create_actor(
81-
NodeInfoUploaderActor,
82-
NodeRole.WORKER,
83-
interval=kw.get("upload_interval"),
84-
band_to_resource=kw.get("band_to_resource"),
85-
use_gpu=kw.get("use_gpu", False),
86-
uid=NodeInfoUploaderActor.default_uid(),
87-
address=address,
88-
),
89-
]
90-
)
65+
coros = [
66+
mo.create_actor(
67+
SupervisorPeerLocatorActor,
68+
"fixed",
69+
address,
70+
uid=SupervisorPeerLocatorActor.default_uid(),
71+
address=address,
72+
),
73+
mo.create_actor(
74+
MockNodeInfoCollectorActor,
75+
with_gpu=kw.get("with_gpu", False),
76+
uid=NodeInfoCollectorActor.default_uid(),
77+
address=address,
78+
),
79+
mo.create_actor(
80+
NodeInfoUploaderActor,
81+
NodeRole.WORKER,
82+
interval=kw.get("upload_interval"),
83+
band_to_resource=kw.get("band_to_resource"),
84+
use_gpu=kw.get("use_gpu", False),
85+
uid=NodeInfoUploaderActor.default_uid(),
86+
address=address,
87+
),
88+
]
89+
dones, _ = await asyncio.wait([asyncio.create_task(coro) for coro in coros])
9190

9291
for task in dones:
9392
try:

mars/services/scheduling/supervisor/tests/test_globalresource.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,12 @@ async def test_global_resource(actor_pool):
6767
band, session_id, ["subtask1"], [Resource(num_cpus=1)]
6868
)
6969

70-
wait_coro = global_resource_ref.wait_band_idle(band)
71-
(done, pending) = await asyncio.wait([wait_coro], timeout=0.5)
70+
wait_task = asyncio.create_task(global_resource_ref.wait_band_idle(band))
71+
(done, pending) = await asyncio.wait([wait_task], timeout=0.5)
7272
assert not done
7373
await global_resource_ref.release_subtask_resource(band, session_id, "subtask0")
74-
(done, pending) = await asyncio.wait([wait_coro], timeout=0.5)
74+
wait_task = asyncio.create_task(global_resource_ref.wait_band_idle(band))
75+
(done, pending) = await asyncio.wait([wait_task], timeout=0.5)
7576
assert done
7677
assert band in await global_resource_ref.get_idle_bands(0)
7778
assert ["subtask1"] == await global_resource_ref.apply_subtask_resources(

0 commit comments

Comments
 (0)