From e40dface85c4c47935105f7e9ecfdfab46157ac0 Mon Sep 17 00:00:00 2001 From: Kevin Paul Date: Wed, 20 Apr 2022 13:10:51 -0600 Subject: [PATCH 1/4] Use Client shutdown at exit --- dask_mpi/core.py | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/dask_mpi/core.py b/dask_mpi/core.py index 51bb8bb..b1a9313 100644 --- a/dask_mpi/core.py +++ b/dask_mpi/core.py @@ -5,7 +5,6 @@ import dask from distributed import Client, Nanny, Scheduler from distributed.utils import import_term -from tornado import gen from tornado.ioloop import IOLoop @@ -150,11 +149,5 @@ def send_close_signal(): in initialize. """ - async def stop(dask_scheduler): - await dask_scheduler.close() - await gen.sleep(0.1) - local_loop = dask_scheduler.loop - local_loop.add_callback(local_loop.stop) - with Client() as c: - c.run_on_scheduler(stop, wait=False) + c.shutdown() From bc2aebfd74e968a6c6d77bc3e955b96bcf447dec Mon Sep 17 00:00:00 2001 From: Kevin Paul Date: Wed, 20 Apr 2022 13:12:17 -0600 Subject: [PATCH 2/4] Cleanup unused imports and variables --- .isort.cfg | 2 +- dask_mpi/core.py | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/.isort.cfg b/.isort.cfg index 829deab..3f01d0d 100644 --- a/.isort.cfg +++ b/.isort.cfg @@ -1,2 +1,2 @@ [settings] -known_third_party = click,dask,distributed,mpi4py,pytest,requests,setuptools,tornado,yaml +known_third_party = click,dask,distributed,mpi4py,pytest,requests,setuptools,yaml diff --git a/dask_mpi/core.py b/dask_mpi/core.py index b1a9313..8ff6ccb 100644 --- a/dask_mpi/core.py +++ b/dask_mpi/core.py @@ -5,7 +5,6 @@ import dask from distributed import Client, Nanny, Scheduler from distributed.utils import import_term -from tornado.ioloop import IOLoop def initialize( @@ -76,7 +75,6 @@ def initialize( comm = MPI.COMM_WORLD rank = comm.Get_rank() - loop = IOLoop.current() if not worker_options: worker_options = {} From 7614249ffaf0ad14ae64b66851eb6ffce11a2260 Mon Sep 17 00:00:00 2001 From: Kevin Paul Date: Wed, 20 Apr 2022 13:13:23 -0600 Subject: [PATCH 3/4] Rename atexit handler for clarity --- dask_mpi/core.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dask_mpi/core.py b/dask_mpi/core.py index 8ff6ccb..f10974e 100644 --- a/dask_mpi/core.py +++ b/dask_mpi/core.py @@ -105,7 +105,7 @@ async def run_scheduler(): if rank == 1: if exit: - atexit.register(send_close_signal) + atexit.register(shutdown_cluster) return True else: @@ -135,7 +135,7 @@ async def run_worker(): return False -def send_close_signal(): +def shutdown_cluster(): """ The client can call this function to explicitly stop the event loop. From 6eacd4e48b022ce3700a7f2767aab2adcba0045a Mon Sep 17 00:00:00 2001 From: Kevin Paul Date: Wed, 20 Apr 2022 13:16:14 -0600 Subject: [PATCH 4/4] Rename atexit handler --- dask_mpi/__init__.py | 2 +- dask_mpi/core.py | 4 ++-- dask_mpi/tests/core_no_exit.py | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/dask_mpi/__init__.py b/dask_mpi/__init__.py index ad7560e..ae8408e 100644 --- a/dask_mpi/__init__.py +++ b/dask_mpi/__init__.py @@ -1,5 +1,5 @@ from ._version import get_versions -from .core import initialize, send_close_signal +from .core import finalize, initialize __version__ = get_versions()["version"] del get_versions diff --git a/dask_mpi/core.py b/dask_mpi/core.py index f10974e..a241c88 100644 --- a/dask_mpi/core.py +++ b/dask_mpi/core.py @@ -105,7 +105,7 @@ async def run_scheduler(): if rank == 1: if exit: - atexit.register(shutdown_cluster) + atexit.register(finalize) return True else: @@ -135,7 +135,7 @@ async def run_worker(): return False -def shutdown_cluster(): +def finalize(): """ The client can call this function to explicitly stop the event loop. diff --git a/dask_mpi/tests/core_no_exit.py b/dask_mpi/tests/core_no_exit.py index fde7baf..9d45889 100644 --- a/dask_mpi/tests/core_no_exit.py +++ b/dask_mpi/tests/core_no_exit.py @@ -1,7 +1,7 @@ from distributed import Client from mpi4py.MPI import COMM_WORLD as world -from dask_mpi import initialize, send_close_signal +from dask_mpi import finalize, initialize # Split our MPI world into two pieces, one consisting just of # the old rank 3 process and the other with everything else @@ -16,7 +16,7 @@ with Client() as c: c.submit(lambda x: x + 1, 10).result() == 11 c.submit(lambda x: x + 1, 20).result() == 21 - send_close_signal() + finalize() # check that our original comm is intact world.Barrier()