Skip to content

Commit e798aed

Browse files
authored
Simplify router clean up when pools or clusters ends (#3086)
1 parent 67d392b commit e798aed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+78
-169
lines changed

MANIFEST.in

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,4 @@ include mars/services/web/static/*
1313
global-exclude .DS_Store
1414
include versioneer.py
1515
include mars/_version.py
16+
global-exclude conftest.py

benchmarks/tpch/gen_data.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import shutil
2727
import subprocess
2828
from multiprocessing import Pool, set_start_method
29-
import pandas as pd
3029
import pyarrow.parquet as pq
3130

3231

@@ -44,6 +43,7 @@
4443
# Change location of tpch-dbgen if not in same place as this script
4544
tpch_dbgen_location = "./tpch-dbgen"
4645

46+
4747
# First element is the table single character short-hand understood by dbgen
4848
# Second element is the number of pieces we want the parquet dataset to have for that table
4949
# Third element is the function that reads generated CSV to a pandas dataframe

benchmarks/tpch/run_queries.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,6 @@ def q03(lineitem, orders, customer):
301301

302302
@tpc_query
303303
def q04(lineitem, orders):
304-
t1 = time.time()
305304
date1 = md.Timestamp("1993-11-01")
306305
date2 = md.Timestamp("1993-08-01")
307306
lsel = lineitem.L_COMMITDATE < lineitem.L_RECEIPTDATE
@@ -617,7 +616,7 @@ def g2(x):
617616
def q13(customer, orders):
618617
customer_filtered = customer.loc[:, ["C_CUSTKEY"]]
619618
orders_filtered = orders[
620-
~orders["O_COMMENT"].str.contains("special[\S|\s]*requests")
619+
~orders["O_COMMENT"].str.contains(r"special[\S|\s]*requests")
621620
]
622621
orders_filtered = orders_filtered.loc[:, ["O_ORDERKEY", "O_CUSTKEY"]]
623622
c_o_merged = customer_filtered.merge(
@@ -696,7 +695,7 @@ def q16(part, partsupp, supplier):
696695
)
697696
total = total.loc[:, ["P_BRAND", "P_TYPE", "P_SIZE", "PS_SUPPKEY"]]
698697
supplier_filtered = supplier[
699-
supplier["S_COMMENT"].str.contains("Customer(\S|\s)*Complaints")
698+
supplier["S_COMMENT"].str.contains(r"Customer(\S|\s)*Complaints")
700699
]
701700
supplier_filtered = supplier_filtered.loc[:, ["S_SUPPKEY"]].drop_duplicates()
702701
# left merge to select only PS_SUPPKEY values not in supplier_filtered

mars/conftest.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,16 @@ def auto_cleanup(request):
3636
request.addfinalizer(clear_all_alru_caches)
3737

3838

39+
@pytest.fixture(scope="module", autouse=True)
40+
def check_router_cleaned(request):
41+
def route_checker():
42+
if Router.get_instance() is not None:
43+
assert len(Router.get_instance()._mapping) == 0
44+
assert len(Router.get_instance()._local_mapping) == 0
45+
46+
request.addfinalizer(route_checker)
47+
48+
3949
@pytest.fixture(scope="module")
4050
def ray_start_regular_shared(request): # pragma: no cover
4151
yield from _ray_start_regular(request)
@@ -144,7 +154,7 @@ def stop_ray(request): # pragma: no cover
144154

145155

146156
@pytest.fixture
147-
async def ray_create_mars_cluster(request):
157+
async def ray_create_mars_cluster(request, check_router_cleaned):
148158
from mars.deploy.oscar.ray import new_cluster, _load_config
149159

150160
ray_config = _load_config()
@@ -177,11 +187,10 @@ def stop_mars():
177187
import mars
178188

179189
mars.stop_server()
180-
Router.set_instance(None)
181190

182191

183192
@pytest.fixture(scope="module")
184-
def _new_test_session():
193+
def _new_test_session(check_router_cleaned):
185194
from .deploy.oscar.tests.session import new_test_session
186195

187196
sess = new_test_session(
@@ -200,7 +209,7 @@ def _new_test_session():
200209

201210

202211
@pytest.fixture(scope="module")
203-
def _new_integrated_test_session():
212+
def _new_integrated_test_session(check_router_cleaned):
204213
from .deploy.oscar.tests.session import new_test_session
205214

206215
sess = new_test_session(
@@ -234,7 +243,7 @@ def _new_integrated_test_session():
234243

235244

236245
@pytest.fixture(scope="module")
237-
def _new_gpu_test_session(): # pragma: no cover
246+
def _new_gpu_test_session(check_router_cleaned): # pragma: no cover
238247
from .deploy.oscar.tests.session import new_test_session
239248
from .resource import cuda_count
240249

mars/dataframe/contrib/raydataset/tests/test_mldataset.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
from ..... import dataframe as md
2121
from .....deploy.oscar.ray import new_cluster
2222
from .....deploy.oscar.session import new_session
23-
from .....oscar.backends.router import Router
2423
from .....tests.core import require_ray
2524
from .....utils import lazy_import
2625
from ....contrib import raydataset as mdd
@@ -47,11 +46,8 @@ async def create_cluster(request):
4746
worker_cpu=1,
4847
worker_mem=256 * 1024**2,
4948
)
50-
try:
51-
async with client:
52-
yield client
53-
finally:
54-
Router.set_instance(None)
49+
async with client:
50+
yield client
5551

5652

5753
@require_ray

mars/dataframe/contrib/raydataset/tests/test_raydataset.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
from ..... import dataframe as md
2121
from .....deploy.oscar.ray import new_cluster
2222
from .....deploy.oscar.session import new_session
23-
from .....oscar.backends.router import Router
2423
from .....tests.core import require_ray
2524
from .....utils import lazy_import
2625
from ....contrib import raydataset as mdd
@@ -45,11 +44,8 @@ async def create_cluster(request):
4544
worker_cpu=1,
4645
worker_mem=256 * 1024**2,
4746
)
48-
try:
49-
async with client:
50-
yield client
51-
finally:
52-
Router.set_instance(None)
47+
async with client:
48+
yield client
5349

5450

5551
@require_ray

mars/deploy/oscar/local.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from ... import oscar as mo
2626
from ...core.entrypoints import init_extension_entrypoints
2727
from ...lib.aio import get_isolation, stop_isolation
28+
from ...oscar.backends.router import Router
2829
from ...resource import cpu_count, cuda_count, mem_total
2930
from ...services import NodeRole
3031
from ...services.task.execution.api import ExecutionConfig
@@ -128,6 +129,7 @@ async def stop_cluster(cluster: ClusterType):
128129
isolation = get_isolation()
129130
coro = cluster.stop()
130131
await asyncio.wrap_future(asyncio.run_coroutine_threadsafe(coro, isolation.loop))
132+
Router.set_instance(None)
131133

132134

133135
class LocalCluster:
@@ -287,6 +289,7 @@ async def stop(self):
287289
await self._supervisor_pool.stop()
288290
AbstractSession.reset_default()
289291
self._exiting_check_task.cancel()
292+
Router.set_instance(None)
290293

291294

292295
class LocalClient:

mars/deploy/oscar/ray.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
process_address_to_placement,
3030
)
3131
from ...oscar.backends.ray.pool import RayPoolState
32+
from ...oscar.backends.router import Router
3233
from ...oscar.errors import ReconstructWorkerError
3334
from ...resource import Resource
3435
from ...services.cluster.backends.base import (
@@ -569,6 +570,7 @@ async def stop(self):
569570
finally:
570571
AbstractSession.reset_default()
571572
RayActorDriver.stop_cluster()
573+
Router.set_instance(None)
572574
self._stopped = True
573575

574576

mars/deploy/oscar/tests/test_fault_injection.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
from .... import dataframe as md
2222
from .... import tensor as mt
23-
from ....oscar.backends.router import Router
2423
from ....oscar.errors import ServerClosed
2524
from ....remote import spawn
2625
from ....services.tests.fault_injection_manager import (
@@ -51,11 +50,8 @@ async def fault_cluster(request):
5150
n_worker=2,
5251
n_cpu=2,
5352
)
54-
try:
55-
async with client:
56-
yield client
57-
finally:
58-
Router.set_instance(None)
53+
async with client:
54+
yield client
5955

6056

6157
async def create_fault_injection_manager(

mars/deploy/oscar/tests/test_local.py

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
from ....config import option_context
3636
from ....core.context import get_context
3737
from ....lib.aio import new_isolation
38-
from ....oscar.backends.router import Router
3938
from ....storage import StorageLevel
4039
from ....services.storage import StorageAPI
4140
from ....tensor.arithmetic.add import TensorAdd
@@ -120,13 +119,10 @@ async def create_cluster(request):
120119
n_cpu=2,
121120
use_uvloop=False,
122121
)
123-
try:
124-
async with client:
125-
if request.param == "default":
126-
assert client.session.client is not None
127-
yield client, request.param
128-
finally:
129-
Router.set_instance(None)
122+
async with client:
123+
if request.param == "default":
124+
assert client.session.client is not None
125+
yield client, request.param
130126

131127

132128
def _assert_storage_cleaned(session_id: str, addr: str, level: StorageLevel):
@@ -649,7 +645,6 @@ def setup_session(request):
649645
yield session
650646
finally:
651647
session.stop_server()
652-
Router.set_instance(None)
653648

654649

655650
def test_decref(setup_session):
@@ -892,11 +887,8 @@ async def speculative_cluster():
892887
n_cpu=10,
893888
use_uvloop=False,
894889
)
895-
try:
896-
async with client:
897-
yield client
898-
finally:
899-
Router.set_instance(None)
890+
async with client:
891+
yield client
900892

901893

902894
@pytest.mark.timeout(timeout=500)

0 commit comments

Comments
 (0)