-
Hi, All. I am trying to run kbatch jobs for an analysis I've been developing using a dask cluster on the MSPC Hub. Everything was going well (and strangely, the current code I have even ran for a long time the first time I tried it), but then I started getting a cryptic string of error output that it's taken a while to pick apart. I believe I have finally traced it down to the following MRE (using the real dependency I'm working with, called FyeldGenerator): 1.) open a bash terminal 2.) create # import the dependency
import FyeldGenerator 3.) launch 4.) run the following code, interactively: import dask_gateway
from dask.distributed import PipInstall
# get client
gateway = dask_gateway.Gateway()
cluster = gateway.new_cluster()
cluster.scale(1) # problem of course occurs with >1 worker, too
client = cluster.get_client()
# install dependency on worker(s)
plugin = PipInstall(packages=["FyeldGenerator"])
client.register_worker_plugin(plugin)
# check that worker install(s) succeed(s)
def test_clust_fn():
import FyeldGenerator
return FyeldGenerator.__file__
out = client.submit(test_clust_fn)
client.gather(out)
# correct path output suggests pkg is successfully installed
# upload the test module
client.upload_file('test_module.py') 5.) get the following error (NOTE: full traceback copy-pasted at the bottom of this message):
This raises two crucial questions for me:
I apologize if there's something obvious that I'm doing wrong or missing! I've looked through all the relevant docs I could think of, but I may just have gotten something confused? Thanks, in advance! full traceback: ---------------------------------------------------------------------------
ModuleNotFoundError Traceback (most recent call last)
Cell In[9], line 1
----> 1 client.upload_file('test_module.py')
File /srv/conda/envs/notebook/lib/python3.11/site-packages/distributed/client.py:3750, in Client.upload_file(self, filename, **kwargs)
3742 results = await asyncio.gather(
3743 self.register_scheduler_plugin(
3744 SchedulerUploadFile(filename), name=name
3745 ),
3746 self.register_worker_plugin(UploadFile(filename), name=name),
3747 )
3748 return results[1] # Results from workers upload
-> 3750 return self.sync(_)
File /srv/conda/envs/notebook/lib/python3.11/site-packages/distributed/utils.py:351, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
349 return future
350 else:
--> 351 return sync(
352 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
353 )
File /srv/conda/envs/notebook/lib/python3.11/site-packages/distributed/utils.py:418, in sync(loop, func, callback_timeout, *args, **kwargs)
416 if error:
417 typ, exc, tb = error
--> 418 raise exc.with_traceback(tb)
419 else:
420 return result
File /srv/conda/envs/notebook/lib/python3.11/site-packages/distributed/utils.py:391, in sync.<locals>.f()
389 future = wait_for(future, callback_timeout)
390 future = asyncio.ensure_future(future)
--> 391 result = yield future
392 except Exception:
393 error = sys.exc_info()
File /srv/conda/envs/notebook/lib/python3.11/site-packages/tornado/gen.py:767, in Runner.run(self)
765 try:
766 try:
--> 767 value = future.result()
768 except Exception as e:
769 # Save the exception for later. It's important that
770 # gen.throw() not be called inside this try/except block
771 # because that makes sys.exc_info behave unexpectedly.
772 exc: Optional[Exception] = e
File /srv/conda/envs/notebook/lib/python3.11/site-packages/distributed/client.py:3742, in Client.upload_file.<locals>._()
3741 async def _():
-> 3742 results = await asyncio.gather(
3743 self.register_scheduler_plugin(
3744 SchedulerUploadFile(filename), name=name
3745 ),
3746 self.register_worker_plugin(UploadFile(filename), name=name),
3747 )
3748 return results[1]
File /srv/conda/envs/notebook/lib/python3.11/site-packages/distributed/client.py:4778, in Client._register_scheduler_plugin(self, plugin, name, idempotent)
4777 async def _register_scheduler_plugin(self, plugin, name, idempotent=False):
-> 4778 return await self.scheduler.register_scheduler_plugin(
4779 plugin=dumps(plugin),
4780 name=name,
4781 idempotent=idempotent,
4782 )
File /srv/conda/envs/notebook/lib/python3.11/site-packages/distributed/core.py:1359, in PooledRPCCall.__getattr__.<locals>.send_recv_from_rpc(**kwargs)
1357 prev_name, comm.name = comm.name, "ConnectionPool." + key
1358 try:
-> 1359 return await send_recv(comm=comm, op=key, **kwargs)
1360 finally:
1361 self.pool.reuse(self.addr, comm)
File /srv/conda/envs/notebook/lib/python3.11/site-packages/distributed/core.py:1143, in send_recv(comm, reply, serializers, deserializers, **kwargs)
1141 _, exc, tb = clean_exception(**response)
1142 assert exc
-> 1143 raise exc.with_traceback(tb)
1144 else:
1145 raise Exception(response["exception_text"])
File /srv/conda/envs/notebook/lib/python3.11/site-packages/distributed/core.py:922, in _handle_comm()
920 result = handler(**msg)
921 if inspect.iscoroutine(result):
--> 922 result = await result
923 elif inspect.isawaitable(result):
924 raise RuntimeError(
925 f"Comm handler returned unknown awaitable. Expected coroutine, instead got {type(result)}"
926 )
File /srv/conda/envs/notebook/lib/python3.11/site-packages/distributed/scheduler.py:5699, in register_scheduler_plugin()
5697 result = plugin.start(self)
5698 if inspect.isawaitable(result):
-> 5699 await result
5701 self.add_plugin(plugin, name=name, idempotent=idempotent)
File /srv/conda/envs/notebook/lib/python3.11/site-packages/distributed/diagnostics/plugin.py:326, in start()
325 async def start(self, scheduler: Scheduler) -> None:
--> 326 await scheduler.upload_file(self.filename, self.data)
File /srv/conda/envs/notebook/lib/python3.11/site-packages/distributed/core.py:523, in upload_file()
521 except Exception as e:
522 logger.exception(e)
--> 523 raise e
525 return {"status": "OK", "nbytes": len(data)}
File /srv/conda/envs/notebook/lib/python3.11/site-packages/distributed/core.py:519, in upload_file()
517 if load:
518 try:
--> 519 import_file(out_filename)
520 cache_loads.data.clear()
521 except Exception as e:
File /srv/conda/envs/notebook/lib/python3.11/site-packages/distributed/utils.py:1100, in import_file()
1098 for name in names_to_import:
1099 logger.info("Reload module %s from %s file", name, ext)
-> 1100 loaded.append(importlib.reload(importlib.import_module(name)))
1101 finally:
1102 if tmp_python_path is not None:
File /srv/conda/envs/notebook/lib/python3.11/importlib/__init__.py:126, in import_module()
124 break
125 level += 1
--> 126 return _bootstrap._gcd_import(name[level:], package, level)
File <frozen importlib._bootstrap>:1204, in _gcd_import()
File <frozen importlib._bootstrap>:1176, in _find_and_load()
File <frozen importlib._bootstrap>:1147, in _find_and_load_unlocked()
File <frozen importlib._bootstrap>:690, in _load_unlocked()
File <frozen importlib._bootstrap_external>:940, in exec_module()
File <frozen importlib._bootstrap>:241, in _call_with_frames_removed()
File /tmp/dask-scratch-space/scheduler-q136p4zn/test_module.py:1
ModuleNotFoundError: No module named 'FyeldGenerator' |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 3 replies
-
In https://docs.dask.org/en/stable/futures.html?highlight=upload_file#distributed.Client.upload_file, the
This one I'm not too sure about... Dynamically installing / loading Python modules is always a bit tricky. Can you check where that code is executing? I see |
Beta Was this translation helpful? Give feedback.
-
Hi, Tom. Thanks! This is a huge help.
>>> dask.distributed.Client.upload_file?
Signature: dask.distributed.Client.upload_file(self, filename, **kwargs)
Docstring:
Upload local package to workers
This sends a local file up to all worker nodes. This file is placed
into the working directory of the running worker, see config option
``temporary-directory`` (defaults to :py:func:`tempfile.gettempdir`).
This directory will be added to the Python's system path so any .py,
.egg or .zip files will be importable.
Parameters
----------
filename : string
Filename of .py, .egg or .zip file to send to workers
**kwargs : dict
Optional keyword arguments for the function
Examples
--------
>>> client.upload_file('mylibrary.egg') # doctest: +SKIP
>>> from mylibrary import myfunc # doctest: +SKIP
>>> L = client.map(myfunc, seq) # doctest: +SKIP
File: /srv/conda/envs/notebook/lib/python3.11/site-packages/distributed/client.py
Type: function I wondered if the more explicit docs on the dask website were introduced in a version between what's currently on MSPC Hub ( At any rate, I suspect that the version issue may be the problem, because I get an identical error and traceback even when I provide I tried to update by running /srv/conda/envs/notebook/lib/python3.11/site-packages/distributed/client.py:1388: VersionMismatchWarning: Mismatched versions found
+---------+----------+-----------+---------+
| Package | Client | Scheduler | Workers |
+---------+----------+-----------+---------+
| dask | 2023.7.0 | 2023.5.0 | None |
+---------+----------+-----------+---------+
warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"])) I have tried running Curious to hear what I should do next!
on the main node: import os, sys
>>> os.getcwd()
'/home/jovyan/avoided_forest_conversion/analysis'
>>> sys.path
['/srv/conda/envs/notebook/bin',
'/srv/conda/envs/notebook/lib/python311.zip',
'/srv/conda/envs/notebook/lib/python3.11',
'/srv/conda/envs/notebook/lib/python3.11/lib-dynload',
'',
'/srv/conda/envs/notebook/lib/python3.11/site-packages']
>>> sys.executable
'/srv/conda/envs/notebook/bin/python3.11' on a worker: >>> def test_clust_fn():
import os, sys
return (os.path, sys.path, sys.executable)
>>> out = client.submit(test_clust_fn)
>>> client.gather(out)
(<module 'posixpath' (frozen)>,
['/tmp/dask-scratch-space/worker-n44nje0_',
'/tmp/dask-scratch-space',
'/srv/conda/envs/notebook/bin',
'/srv/conda/envs/notebook/lib/python311.zip',
'/srv/conda/envs/notebook/lib/python3.11',
'/srv/conda/envs/notebook/lib/python3.11/lib-dynload',
'/srv/conda/envs/notebook/lib/python3.11/site-packages'],
'/srv/conda/envs/notebook/bin/python3.11') |
Beta Was this translation helpful? Give feedback.
Hi, Tom.
Thanks so much! This all makes sense. Sounds like I'm bumping into an awkward dependency limitation of running analysis on MSPC. That's an inevitable consequence of relying on a free and general-purpose platform for a specific project.
I considered moving onto our own compute, and that is certainly an option we may consider later on nonetheless. However, I stepped back even further and realized that this all really boiled down to just needing to use one function from one package (FyeldGenerator). I took a look at the source code and it is remarkably short and sweet. Thus, in the end, I just cannibalized that single function directly into my scripts, removing the need for pip inst…