Skip to content

Commit 5b240a8

Browse files
wjsiXuye (Chris) Qin
authored and
Xuye (Chris) Qin
committed
Callback on quota allocation failure & bump version (#516)
1 parent fb3a796 commit 5b240a8

File tree

4 files changed

+77
-17
lines changed

4 files changed

+77
-17
lines changed

mars/_version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import os
1717
import sys
1818

19-
version_info = (0, 2, 0, 'b1')
19+
version_info = (0, 2, 0, 'b2')
2020
_num_index = max(idx if isinstance(v, int) else 0
2121
for idx, v in enumerate(version_info))
2222
__version__ = '.'.join(map(str, version_info[:_num_index + 1])) + \

mars/scheduler/tests/test_graph.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from mars.scheduler.utils import SchedulerClusterInfoActor
2424
from mars.utils import serialize_graph, get_next_port
2525
from mars.actors import create_actor_pool
26+
from mars.graph import DAG
2627
from mars.tests.core import patch_method
2728

2829

@@ -176,6 +177,28 @@ def testGraphTermination(self, *_):
176177

177178
self.assertEqual(graph_ref.get_state(), GraphState.FAILED)
178179

180+
def testEmptyGraph(self, *_):
181+
session_id = str(uuid.uuid4())
182+
183+
addr = '127.0.0.1:%d' % get_next_port()
184+
with create_actor_pool(n_process=1, backend='gevent', address=addr) as pool:
185+
pool.create_actor(SchedulerClusterInfoActor, [pool.cluster_info.address],
186+
uid=SchedulerClusterInfoActor.default_uid())
187+
resource_ref = pool.create_actor(ResourceActor, uid=ResourceActor.default_uid())
188+
pool.create_actor(ChunkMetaActor, uid=ChunkMetaActor.default_uid())
189+
pool.create_actor(AssignerActor, uid=AssignerActor.default_uid())
190+
191+
resource_ref.set_worker_meta('localhost:12345', dict(hardware=dict(cpu_total=4)))
192+
resource_ref.set_worker_meta('localhost:23456', dict(hardware=dict(cpu_total=4)))
193+
194+
graph_key = str(uuid.uuid4())
195+
serialized_graph = serialize_graph(DAG())
196+
197+
graph_ref = pool.create_actor(GraphActor, session_id, graph_key, serialized_graph,
198+
uid=GraphActor.gen_uid(session_id, graph_key))
199+
graph_ref.execute_graph()
200+
self.assertEqual(graph_ref.get_state(), GraphState.SUCCEEDED)
201+
179202
def testErrorOnPrepare(self, *_):
180203
session_id = str(uuid.uuid4())
181204

mars/worker/quota.py

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,11 @@
1414

1515
import itertools
1616
import logging
17+
import sys
1718
import time
1819
from collections import namedtuple
1920

20-
from .. import resource
21+
from .. import resource, promise
2122
from ..compat import OrderedDict3
2223
from ..utils import log_unhandled
2324
from .utils import WorkerActor
@@ -62,6 +63,7 @@ def _log_allocate(self, msg, *args, **kwargs):
6263
args += (self._allocated_size, self._total_size)
6364
logger.debug(msg + ' Allocated: %s, Total size: %s', *args, **kwargs)
6465

66+
@promise.reject_on_exception
6567
@log_unhandled
6668
def request_batch_quota(self, batch, callback=None):
6769
"""
@@ -93,6 +95,7 @@ def request_batch_quota(self, batch, callback=None):
9395
return self._request_quota(keys, values, delta, callback, multiple=True,
9496
make_first=all_allocated)
9597

98+
@promise.reject_on_exception
9699
@log_unhandled
97100
def request_quota(self, key, quota_size, callback=None):
98101
"""
@@ -339,21 +342,28 @@ def _process_requests(self):
339342
removed = []
340343
for k, req in self._requests.items():
341344
req_size, delta, req_time, multiple, callbacks = req
342-
if self._has_space(delta):
343-
alter_allocation = self.alter_allocations if multiple else self.alter_allocation
344-
alter_allocation(k, req_size, handle_shrink=False)
345-
for cb in callbacks:
346-
self.tell_promise(cb)
347-
if self._status_ref:
348-
self._status_ref.update_mean_stats(
349-
'wait_time.' + self.uid.replace('Actor', ''), time.time() - req_time,
350-
_tell=True, _wait=False)
345+
try:
346+
if self._has_space(delta):
347+
alter_allocation = self.alter_allocations if multiple else self.alter_allocation
348+
alter_allocation(k, req_size, handle_shrink=False)
349+
for cb in callbacks:
350+
self.tell_promise(cb)
351+
if self._status_ref:
352+
self._status_ref.update_mean_stats(
353+
'wait_time.' + self.uid.replace('Actor', ''), time.time() - req_time,
354+
_tell=True, _wait=False)
355+
removed.append(k)
356+
else:
357+
# Quota left cannot satisfy the next request, we quit
358+
break
359+
except: # noqa: E722
351360
removed.append(k)
352-
else:
353-
# Quota left cannot satisfy the next request, we quit
354-
break
361+
# just in case the quota is allocated
362+
self.release_quota(k)
363+
for cb in callbacks:
364+
self.tell_promise(cb, *sys.exc_info(), **dict(_accept=False))
355365
for k in removed:
356-
del self._requests[k]
366+
self._requests.pop(k, None)
357367

358368

359369
class MemQuotaActor(QuotaActor):

mars/worker/tests/test_quota.py

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@
2626

2727
class Test(WorkerCase):
2828
def testQuota(self):
29+
def _raiser(*_, **__):
30+
raise ValueError
31+
2932
local_pool_addr = 'localhost:%d' % get_next_port()
3033
with create_actor_pool(n_process=1, backend='gevent', address=local_pool_addr) as pool:
3134
pool.create_actor(WorkerClusterInfoActor, schedulers=[local_pool_addr],
@@ -68,10 +71,23 @@ def testQuota(self):
6871

6972
self.assertNotIn('2', quota_ref.dump_data().allocations)
7073

71-
ref.cancel_requests(('1',), reject_exc=build_exc_info(ValueError))
72-
with self.assertRaises(ValueError):
74+
ref.cancel_requests(('1',), reject_exc=build_exc_info(OSError))
75+
with self.assertRaises(OSError):
7376
self.get_result(5)
7477

78+
with patch_method(QuotaActor._request_quota, new=_raiser):
79+
ref.request_quota('err_raise', 1, _promise=True) \
80+
.catch(lambda *exc: test_actor.set_result(exc, accept=False))
81+
82+
with self.assertRaises(ValueError):
83+
self.get_result(5)
84+
85+
ref.request_batch_quota({'err_raise': 1}, _promise=True) \
86+
.catch(lambda *exc: test_actor.set_result(exc, accept=False))
87+
88+
with self.assertRaises(ValueError):
89+
self.get_result(5)
90+
7591
self.assertNotIn('1', quota_ref.dump_data().requests)
7692
self.assertIn('2', quota_ref.dump_data().allocations)
7793
self.assertNotIn('3', quota_ref.dump_data().allocations)
@@ -83,6 +99,17 @@ def testQuota(self):
8399
quota_ref.alter_allocations(['3'], [50])
84100
self.assertIn('4', quota_ref.dump_data().allocations)
85101

102+
with self.run_actor_test(pool) as test_actor:
103+
ref = test_actor.promise_ref(QuotaActor.default_uid())
104+
ref.request_quota('5', 50, _promise=True) \
105+
.catch(lambda *exc: test_actor.set_result(exc, accept=False))
106+
107+
with patch_method(QuotaActor.alter_allocation, new=_raiser):
108+
quota_ref.release_quota('2')
109+
110+
with self.assertRaises(ValueError):
111+
self.get_result(5)
112+
86113
def testQuotaAllocation(self):
87114
local_pool_addr = 'localhost:%d' % get_next_port()
88115
with create_actor_pool(n_process=1, backend='gevent', address=local_pool_addr) as pool:

0 commit comments

Comments
 (0)