Skip to content

Fix shutdown errors on Python 3.8 #89

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .isort.cfg
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
[settings]
known_third_party = click,dask,distributed,mpi4py,pytest,requests,setuptools,tornado,yaml
known_third_party = click,dask,distributed,mpi4py,pytest,requests,setuptools,yaml
2 changes: 1 addition & 1 deletion dask_mpi/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from ._version import get_versions
from .core import initialize, send_close_signal
from .core import finalize, initialize

__version__ = get_versions()["version"]
del get_versions
15 changes: 3 additions & 12 deletions dask_mpi/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
import dask
from distributed import Client, Nanny, Scheduler
from distributed.utils import import_term
from tornado import gen
from tornado.ioloop import IOLoop


def initialize(
Expand Down Expand Up @@ -77,7 +75,6 @@ def initialize(
comm = MPI.COMM_WORLD

rank = comm.Get_rank()
loop = IOLoop.current()

if not worker_options:
worker_options = {}
Expand Down Expand Up @@ -108,7 +105,7 @@ async def run_scheduler():

if rank == 1:
if exit:
atexit.register(send_close_signal)
atexit.register(finalize)
return True
else:

Expand Down Expand Up @@ -138,7 +135,7 @@ async def run_worker():
return False


def send_close_signal():
def finalize():
"""
The client can call this function to explicitly stop
the event loop.
Expand All @@ -150,11 +147,5 @@ def send_close_signal():
in initialize.
"""

async def stop(dask_scheduler):
await dask_scheduler.close()
await gen.sleep(0.1)
local_loop = dask_scheduler.loop
local_loop.add_callback(local_loop.stop)

with Client() as c:
c.run_on_scheduler(stop, wait=False)
c.shutdown()
4 changes: 2 additions & 2 deletions dask_mpi/tests/core_no_exit.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from distributed import Client
from mpi4py.MPI import COMM_WORLD as world

from dask_mpi import initialize, send_close_signal
from dask_mpi import finalize, initialize

# Split our MPI world into two pieces, one consisting just of
# the old rank 3 process and the other with everything else
Expand All @@ -16,7 +16,7 @@
with Client() as c:
c.submit(lambda x: x + 1, 10).result() == 11
c.submit(lambda x: x + 1, 20).result() == 21
send_close_signal()
finalize()

# check that our original comm is intact
world.Barrier()
Expand Down