Skip to content

Commit d814d7c

Browse files
committed
black
1 parent 782e7b7 commit d814d7c

File tree

3 files changed

+29
-15
lines changed

3 files changed

+29
-15
lines changed

mars/services/scheduling/supervisor/manager.py

+20-12
Original file line numberDiff line numberDiff line change
@@ -172,9 +172,7 @@ async def _get_execution_ref(self, band: BandType):
172172

173173
return await mo.actor_ref(SubtaskExecutionActor.default_uid(), address=band[0])
174174

175-
async def set_subtask_result(
176-
self, result: SubtaskResult, band: BandType
177-
):
175+
async def set_subtask_result(self, result: SubtaskResult, band: BandType):
178176
info = self._subtask_infos[result.subtask_id]
179177
subtask_id = info.subtask.subtask_id
180178
notify_task_service = True
@@ -346,28 +344,38 @@ async def batch_submit_subtask_to_band(self, args_list, kwargs_list):
346344
async def _submit_subtasks_to_band(self, band: BandType, subtask_ids: List[str]):
347345
execution_ref = await self._get_execution_ref(band)
348346
delays = []
347+
task_stage_count = defaultdict(lambda: 0)
349348

350349
async with redirect_subtask_errors(
351350
self, self._get_subtasks_by_ids(subtask_ids)
352351
):
353352
for subtask_id in subtask_ids:
354353
subtask_info = self._subtask_infos[subtask_id]
355354
subtask = subtask_info.subtask
356-
self._submitted_subtask_count.record(
357-
1,
358-
{
359-
"session_id": self._session_id,
360-
"task_id": subtask.task_id,
361-
"stage_id": subtask.stage_id,
362-
},
363-
)
364-
logger.debug("Start run subtask %s in band %s.", subtask_id, band)
355+
task_stage_count[(subtask.task_id, subtask.stage_id)] += 1
365356
delays.append(
366357
execution_ref.run_subtask.delay(subtask, band[1], self.address)
367358
)
368359
subtask_info.band_futures[band] = asyncio.Future()
369360
subtask_info.start_time = time.time()
370361
self._speculation_execution_scheduler.add_subtask(subtask_info)
362+
363+
for (task_id, stage_id), cnt in task_stage_count.items():
364+
self._submitted_subtask_count.record(
365+
cnt,
366+
{
367+
"session_id": self._session_id,
368+
"task_id": task_id,
369+
"stage_id": stage_id,
370+
},
371+
)
372+
373+
logger.debug(
374+
"Start run %d subtasks %r in band %s.",
375+
len(subtask_ids),
376+
subtask_ids,
377+
band,
378+
)
371379
await execution_ref.run_subtask.batch(*delays, send=False)
372380

373381
async def cancel_subtasks(

mars/services/scheduling/supervisor/tests/test_manager.py

+6-2
Original file line numberDiff line numberDiff line change
@@ -109,12 +109,16 @@ async def task_fun():
109109
result.status = SubtaskStatus.cancelled
110110
result.error = ex
111111
result.traceback = ex.__traceback__
112-
await manager_ref.set_subtask_result.tell(result, (self.address, band_name))
112+
await manager_ref.set_subtask_result.tell(
113+
result, (self.address, band_name)
114+
)
113115
raise
114116
else:
115117
result.status = SubtaskStatus.succeeded
116118
result.execution_end_time = time.time()
117-
await manager_ref.set_subtask_result.tell(result, (self.address, band_name))
119+
await manager_ref.set_subtask_result.tell(
120+
result, (self.address, band_name)
121+
)
118122

119123
self._subtask_aiotasks[subtask.subtask_id][band_name] = asyncio.create_task(
120124
task_fun()

mars/services/scheduling/worker/execution.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -572,7 +572,9 @@ async def subtask_caller():
572572
manager_ref = await self._get_manager_ref(
573573
subtask.session_id, supervisor_address
574574
)
575-
await manager_ref.set_subtask_result.tell(res, (self.address, band_name))
575+
await manager_ref.set_subtask_result.tell(
576+
res, (self.address, band_name)
577+
)
576578
finally:
577579
self._subtask_info.pop(subtask_id, None)
578580
self._finished_subtask_count.record(1, {"band": self.address})

0 commit comments

Comments
 (0)