Skip to content

Commit cfc2c83

Browse files
committed
Fix tests
1 parent 7ee3006 commit cfc2c83

File tree

11 files changed

+178
-154
lines changed

11 files changed

+178
-154
lines changed

mars/deploy/oscar/tests/test_local.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ def _wrap_original_deploy_band_resources(*args, **kwargs):
185185

186186

187187
@pytest.mark.asyncio
188+
@pytest.mark.skipif(vineyard is None, reason="vineyard not installed")
188189
async def test_vineyard_operators(create_cluster):
189190
param = create_cluster[1]
190191
if param != "vineyard":

mars/services/scheduling/api/oscar.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,10 @@ async def update_subtask_priority(self, args_list, kwargs_list):
9696
)
9797

9898
async def cancel_subtasks(
99-
self, subtask_ids: List[str], kill_timeout: Union[float, int] = None
99+
self,
100+
subtask_ids: List[str],
101+
kill_timeout: Union[float, int] = None,
102+
wait: bool = False,
100103
):
101104
"""
102105
Cancel pending and running subtasks.
@@ -108,7 +111,14 @@ async def cancel_subtasks(
108111
kill_timeout
109112
timeout seconds to kill actor process forcibly
110113
"""
111-
await self._manager_ref.cancel_subtasks(subtask_ids, kill_timeout=kill_timeout)
114+
if wait:
115+
await self._manager_ref.cancel_subtasks(
116+
subtask_ids, kill_timeout=kill_timeout
117+
)
118+
else:
119+
await self._manager_ref.cancel_subtasks.tell(
120+
subtask_ids, kill_timeout=kill_timeout
121+
)
112122

113123
async def finish_subtasks(
114124
self,
@@ -122,8 +132,8 @@ async def finish_subtasks(
122132
123133
Parameters
124134
----------
125-
subtask_ids
126-
ids of subtasks to mark as finished
135+
subtask_results
136+
results of subtasks, must in finished states
127137
bands
128138
bands of subtasks to mark as finished
129139
schedule_next

mars/services/scheduling/supervisor/manager.py

Lines changed: 42 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,9 @@
2222
from .... import oscar as mo
2323
from ....lib.aio import alru_cache
2424
from ....metrics import Metrics
25-
from ....oscar.backends.context import ProfilingContext
2625
from ....oscar.errors import MarsError
27-
from ....oscar.profiling import ProfilingData, MARS_ENABLE_PROFILING
2826
from ....typing import BandType
29-
from ....utils import dataslots, Timer
27+
from ....utils import dataslots
3028
from ...subtask import Subtask, SubtaskResult, SubtaskStatus
3129
from ...task import TaskAPI
3230
from ..core import SubtaskScheduleSummary
@@ -127,14 +125,6 @@ async def __post_create__(self):
127125
)
128126
await self._speculation_execution_scheduler.start()
129127

130-
async def dump_running():
131-
while True:
132-
if self._subtask_infos:
133-
logger.warning("RUNNING: %r", list(self._subtask_infos))
134-
await asyncio.sleep(5)
135-
136-
asyncio.create_task(dump_running())
137-
138128
async def __pre_destroy__(self):
139129
await self._speculation_execution_scheduler.stop()
140130

@@ -186,7 +176,7 @@ async def _handle_subtask_result(
186176
self, info: SubtaskScheduleInfo, result: SubtaskResult, band: BandType
187177
):
188178
subtask_id = info.subtask.subtask_id
189-
async with redirect_subtask_errors(self, [info.subtask]):
179+
async with redirect_subtask_errors(self, [info.subtask], reraise=False):
190180
try:
191181
info.band_futures[band].set_result(result)
192182
if result.error is not None:
@@ -262,9 +252,9 @@ async def finish_subtasks(
262252

263253
if subtask_info is not None:
264254
if subtask_band is not None:
265-
logger.warning("BEFORE await self._handle_subtask_result(subtask_info, result, subtask_band)")
266-
await self._handle_subtask_result(subtask_info, result, subtask_band)
267-
logger.warning("AFTER await self._handle_subtask_result(subtask_info, result, subtask_band)")
255+
await self._handle_subtask_result(
256+
subtask_info, result, subtask_band
257+
)
268258

269259
self._finished_subtask_count.record(
270260
1,
@@ -275,16 +265,15 @@ async def finish_subtasks(
275265
},
276266
)
277267
self._subtask_summaries[subtask_id] = subtask_info.to_summary(
278-
is_finished=True, is_cancelled=result.status == SubtaskStatus.cancelled
268+
is_finished=True,
269+
is_cancelled=result.status == SubtaskStatus.cancelled,
279270
)
280271
subtask_info.end_time = time.time()
281272
self._speculation_execution_scheduler.finish_subtask(subtask_info)
282273
# Cancel subtask on other bands.
283274
aio_task = subtask_info.band_futures.pop(subtask_band, None)
284275
if aio_task:
285-
logger.warning("BEFORE await aio_task")
286276
await aio_task
287-
logger.warning("AFTER await aio_task")
288277
if schedule_next:
289278
band_tasks[subtask_band] += 1
290279
if subtask_info.band_futures:
@@ -304,7 +293,6 @@ async def finish_subtasks(
304293
if schedule_next:
305294
for band in subtask_info.band_futures.keys():
306295
band_tasks[band] += 1
307-
# await self._queueing_ref.remove_queued_subtasks(subtask_ids)
308296
if band_tasks:
309297
await self._queueing_ref.submit_subtasks.tell(dict(band_tasks))
310298

@@ -345,7 +333,9 @@ async def batch_submit_subtask_to_band(self, args_list, kwargs_list):
345333
band_to_subtask_ids[band].append(subtask_id)
346334

347335
if res_release_delays:
348-
await self._global_resource_ref.release_subtask_resource.batch(*res_release_delays)
336+
await self._global_resource_ref.release_subtask_resource.batch(
337+
*res_release_delays
338+
)
349339

350340
for band, subtask_ids in band_to_subtask_ids.items():
351341
asyncio.create_task(self._submit_subtasks_to_band(band, subtask_ids))
@@ -386,29 +376,22 @@ async def cancel_subtasks(
386376
subtask_ids,
387377
kill_timeout,
388378
)
389-
queued_subtask_ids = []
390-
single_cancel_tasks = []
391379

392380
task_api = await self._get_task_api()
393381

394-
async def cancel_single_task(subtask, raw_tasks, cancel_tasks):
395-
if cancel_tasks:
396-
await asyncio.wait(cancel_tasks)
397-
if raw_tasks:
398-
dones, _ = await asyncio.wait(raw_tasks)
399-
else:
400-
dones = []
401-
if not dones or all(fut.cancelled() for fut in dones):
402-
await task_api.set_subtask_result(
403-
SubtaskResult(
404-
subtask_id=subtask.subtask_id,
405-
session_id=subtask.session_id,
406-
task_id=subtask.task_id,
407-
stage_id=subtask.stage_id,
408-
status=SubtaskStatus.cancelled,
409-
)
410-
)
382+
async def cancel_task_in_band(band):
383+
cancel_delays = band_to_cancel_delays.get(band) or []
384+
execution_ref = await self._get_execution_ref(band)
385+
if cancel_delays:
386+
await execution_ref.cancel_subtask.batch(*cancel_delays)
387+
band_futures = band_to_futures.get(band)
388+
if band_futures:
389+
await asyncio.wait(band_futures)
411390

391+
queued_subtask_ids = []
392+
cancel_tasks = []
393+
band_to_cancel_delays = defaultdict(list)
394+
band_to_futures = defaultdict(list)
412395
for subtask_id in subtask_ids:
413396
if subtask_id not in self._subtask_infos:
414397
# subtask may already finished or not submitted at all
@@ -423,35 +406,33 @@ async def cancel_single_task(subtask, raw_tasks, cancel_tasks):
423406
raw_tasks_to_cancel = list(info.band_futures.values())
424407

425408
if not raw_tasks_to_cancel:
426-
queued_subtask_ids.append(subtask_id)
427-
single_cancel_tasks.append(
428-
asyncio.create_task(
429-
cancel_single_task(info.subtask, [], [])
430-
)
409+
# not submitted yet: mark subtasks as cancelled
410+
result = SubtaskResult(
411+
subtask_id=info.subtask.subtask_id,
412+
session_id=info.subtask.session_id,
413+
task_id=info.subtask.task_id,
414+
stage_id=info.subtask.stage_id,
415+
status=SubtaskStatus.cancelled,
431416
)
417+
cancel_tasks.append(task_api.set_subtask_result(result))
418+
queued_subtask_ids.append(subtask_id)
432419
else:
433-
cancel_tasks = []
434-
for band in info.band_futures.keys():
420+
for band, future in info.band_futures.items():
435421
execution_ref = await self._get_execution_ref(band)
436-
cancel_tasks.append(
437-
asyncio.create_task(
438-
execution_ref.cancel_subtask(
439-
subtask_id, kill_timeout=kill_timeout
440-
)
441-
)
422+
band_to_cancel_delays[band].append(
423+
execution_ref.cancel_subtask.delay(subtask_id, kill_timeout)
442424
)
443-
single_cancel_tasks.append(
444-
asyncio.create_task(
445-
cancel_single_task(
446-
info.subtask, raw_tasks_to_cancel, cancel_tasks
447-
)
448-
)
449-
)
425+
band_to_futures[band].append(future)
426+
427+
for band in band_to_futures:
428+
cancel_tasks.append(asyncio.create_task(cancel_task_in_band(band)))
429+
450430
if queued_subtask_ids:
451431
# Don't use `finish_subtasks` because it may remove queued
452432
await self._queueing_ref.remove_queued_subtasks(queued_subtask_ids)
453-
if single_cancel_tasks:
454-
yield asyncio.wait(single_cancel_tasks)
433+
434+
if cancel_tasks:
435+
yield asyncio.gather(*cancel_tasks)
455436

456437
for subtask_id in subtask_ids:
457438
info = self._subtask_infos.pop(subtask_id, None)

mars/services/scheduling/supervisor/queueing.py

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -220,9 +220,7 @@ async def _submission_task_func(self):
220220

221221
async def _submit_subtask_request(self, band_to_limit: Dict[BandType, int] = None):
222222
if band_to_limit:
223-
logger.debug(
224-
"TMP_QUEUE_PROBE: Submitting subtasks with limits: %r", band_to_limit
225-
)
223+
logger.debug("Submitting subtasks with limits: %r", band_to_limit)
226224

227225
if not self._band_to_resource or any(
228226
not limit and band not in self._band_to_resource
@@ -274,8 +272,6 @@ def _load_items_to_submit():
274272

275273
await asyncio.to_thread(_load_items_to_submit)
276274

277-
logger.debug("TMP_QUEUE_PROBE: Finished picking top subtasks")
278-
279275
async with redirect_subtask_errors(
280276
self,
281277
(
@@ -288,11 +284,6 @@ def _load_items_to_submit():
288284
*apply_delays
289285
)
290286

291-
logger.debug(
292-
"TMP_QUEUE_PROBE: Finished band resource allocation, %d subtasks submitted",
293-
sum(len(ids) for ids in submitted_ids_list),
294-
)
295-
296287
manager_ref = await self._get_manager_ref()
297288
submit_delays = []
298289

@@ -336,10 +327,7 @@ def _gather_submissions():
336327
heapq.heappush(task_queue, submit_items[stid])
337328

338329
await asyncio.to_thread(_gather_submissions)
339-
340-
logger.debug("TMP_QUEUE_PROBE: Start subtask submission in batch")
341330
await manager_ref.submit_subtask_to_band.batch(*submit_delays)
342-
logger.debug("TMP_QUEUE_PROBE: Finished subtask submission")
343331

344332
def _ensure_top_item_valid(self, task_queue):
345333
"""Clean invalid subtask item from the queue to ensure that when the queue is not empty,

mars/services/scheduling/supervisor/tests/test_manager.py

Lines changed: 18 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414

1515
import asyncio
16+
import time
1617
from collections import defaultdict
1718
from typing import List, Dict, Tuple, Set
1819

@@ -91,40 +92,33 @@ async def run_subtask(
9192

9293
async def task_fun():
9394
task_api = await TaskAPI.create(subtask.session_id, supervisor_address)
95+
result = SubtaskResult(
96+
subtask_id=subtask.subtask_id,
97+
session_id=subtask.session_id,
98+
task_id=subtask.task_id,
99+
stage_id=subtask.stage_id,
100+
bands=[(self.address, band_name)],
101+
progress=1.0,
102+
execution_start_time=time.time(),
103+
)
94104
try:
95105
await asyncio.sleep(20)
96106
except asyncio.CancelledError as ex:
97-
await task_api.set_subtask_result(
98-
SubtaskResult(
99-
subtask_id=subtask.subtask_id,
100-
session_id=subtask.session_id,
101-
task_id=subtask.task_id,
102-
stage_id=subtask.stage_id,
103-
bands=[(self.address, band_name)],
104-
status=SubtaskStatus.cancelled,
105-
progress=1.0,
106-
error=ex,
107-
traceback=ex.__traceback__,
108-
)
109-
)
107+
result.status = SubtaskStatus.cancelled
108+
result.error = ex
109+
result.traceback = ex.__traceback__
110+
await task_api.set_subtask_result(result)
110111
raise
111112
else:
112-
await task_api.set_subtask_result(
113-
SubtaskResult(
114-
subtask_id=subtask.subtask_id,
115-
session_id=subtask.session_id,
116-
task_id=subtask.task_id,
117-
stage_id=subtask.stage_id,
118-
status=SubtaskStatus.succeeded,
119-
bands=[(self.address, band_name)],
120-
progress=1.0,
121-
)
122-
)
113+
result.status = SubtaskStatus.succeeded
114+
result.execution_end_time = time.time()
115+
await task_api.set_subtask_result(result)
123116

124117
self._subtask_aiotasks[subtask.subtask_id][band_name] = asyncio.create_task(
125118
task_fun()
126119
)
127120

121+
@mo.extensible
128122
def cancel_subtask(self, subtask_id: str, kill_timeout: int = 5):
129123
for task in self._subtask_aiotasks[subtask_id].values():
130124
task.cancel()

mars/services/scheduling/tests/test_service.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,7 @@ async def set_subtask_result(self, subtask_result: SubtaskResult):
5050
for event in self._events[subtask_result.subtask_id]:
5151
event.set()
5252
self._events.pop(subtask_result.subtask_id, None)
53-
await scheduling_api.finish_subtasks(
54-
[subtask_result], subtask_result.bands
55-
)
53+
await scheduling_api.finish_subtasks([subtask_result], subtask_result.bands)
5654

5755
def _return_result(self, subtask_id: str):
5856
result = self._results[subtask_id]

mars/services/scheduling/utils.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@ async def _get_task_api(actor: mo.Actor):
2929

3030

3131
@contextlib.asynccontextmanager
32-
async def redirect_subtask_errors(actor: mo.Actor, subtasks: Iterable[Subtask]):
32+
async def redirect_subtask_errors(
33+
actor: mo.Actor, subtasks: Iterable[Subtask], reraise: bool = True
34+
):
3335
try:
3436
yield
3537
except: # noqa: E722 # pylint: disable=bare-except
@@ -60,4 +62,5 @@ async def redirect_subtask_errors(actor: mo.Actor, subtasks: Iterable[Subtask]):
6062
)
6163
tasks = [asyncio.ensure_future(coro) for coro in coros]
6264
await asyncio.wait(tasks)
63-
raise
65+
if reraise:
66+
raise

0 commit comments

Comments
 (0)