Description
Blocking get to a worker fails inside a remote function, if the same worker sends back an operation to the worker that issues the blocking get. eg: If worker_A sends a get request to worker_B. This would fail if Worker_B sends some operation to worker_A before processing the get request.
Triage Details
In the provided code snipet the blocking get from func2 ie unidist.get(func1.remote())
waits for data from func1 (with a comm.recv
), but instead receives the operation type (common.Operation.EXECUTE
). Due to this mismatch between send and recv later the async_wrap
in loop.py recieves the data intended to be recieved by blocking get().
The issue can be reproduced with this below code.
import unidist
unidist.init()
@unidist.remote
def func0():
return 1
@unidist.remote
def func1():
func0.remote()
return 1
@unidist.remote
def func2():
unidist.get(func1.remote())
return 1
result = unidist.get(func2.remote())
Error:
result = unidist.get(func2.remote()) File "/localdisk/arunjose/code/unidist/unidist/api.py", line 160, in get return execution_backend.get(object_refs) File "/localdisk/arunjose/code/unidist/unidist/core/base/backend.py", line 286, in get return self._backend_cls.get(object_refs) File "/localdisk/arunjose/code/unidist/unidist/core/backends/mpi/backend.py", line 79, in get return mpi.get(data_ids) File "/localdisk/arunjose/code/unidist/unidist/core/backends/mpi/core/controller/api.py", line 270, in get values = [get_impl(data_id) for data_id in data_ids] File "/localdisk/arunjose/code/unidist/unidist/core/backends/mpi/core/controller/api.py", line 270, in <listcomp> values = [get_impl(data_id) for data_id in data_ids] File "/localdisk/arunjose/code/unidist/unidist/core/backends/mpi/core/controller/api.py", line 260, in get_impl raise value
TypeError: 'int' object is not subscriptable
Traceback (most recent call last): File "<string>", line 1, in <module> File "/localdisk/arunjose/code/unidist/unidist/api.py", line 92, in init init_backend() File "/localdisk/arunjose/code/unidist/unidist/core/base/utils.py", line 43, in init_backend initialize_mpi() File "/localdisk/arunjose/code/unidist/unidist/core/backends/mpi/utils.py", line 12, in initialize_mpi init() File "/localdisk/arunjose/code/unidist/unidist/core/backends/mpi/core/controller/api.py", line 156, in init asyncio.run(worker_loop()) File "/localdisk/arunjose/miniconda3/envs/modin_on_unidist_2/lib/python3.8/asyncio/runners.py", line 44, in run return loop.run_until_complete(main) File "/localdisk/arunjose/miniconda3/envs/modin_on_unidist_2/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete return future.result() File "/localdisk/arunjose/code/unidist/unidist/core/backends/mpi/core/worker/loop.py", line 90, in worker_loop operation_type, source_rank = await async_wrap( File "/localdisk/arunjose/code/unidist/unidist/core/backends/mpi/core/worker/loop.py", line 63, in run return await loop.run_in_executor(executor, pfunc) File "/localdisk/arunjose/miniconda3/envs/modin_on_unidist_2/lib/python3.8/concurrent/futures/thread.py", line 57, in run result = self.fn(*self.args, **self.kwargs) File "/localdisk/arunjose/code/unidist/unidist/core/backends/mpi/core/communication.py", line 323, in recv_operation_type log_operation(op_type, status) File "/localdisk/arunjose/code/unidist/unidist/core/backends/mpi/core/communication.py", line 66, in log_operation op_name = common.get_op_name(op_type) File "/localdisk/arunjose/code/unidist/unidist/core/backends/mpi/core/common.py", line 87, in get_op_name op_name = operations_dict.get(op, None)
TypeError: unhashable type: 'dict'