Skip to content

Blocking get to a worker fails inside a remote function  #258

Open
@arunjose696

Description

@arunjose696

Blocking get to a worker fails inside a remote function, if the same worker sends back an operation to the worker that issues the blocking get. eg: If worker_A sends a get request to worker_B. This would fail if Worker_B sends some operation to worker_A before processing the get request.

Triage Details

In the provided code snipet the blocking get from func2 ie unidist.get(func1.remote()) waits for data from func1 (with a comm.recv), but instead receives the operation type (common.Operation.EXECUTE). Due to this mismatch between send and recv later the async_wrap in loop.py recieves the data intended to be recieved by blocking get().

The issue can be reproduced with this below code.

import unidist

unidist.init()

@unidist.remote
def func0():
    return 1

@unidist.remote
def func1():
    func0.remote()    
    return 1

@unidist.remote
def func2():       
    unidist.get(func1.remote())
    return 1

result = unidist.get(func2.remote())

Error:

result = unidist.get(func2.remote()) File "/localdisk/arunjose/code/unidist/unidist/api.py", line 160, in get return execution_backend.get(object_refs) File "/localdisk/arunjose/code/unidist/unidist/core/base/backend.py", line 286, in get return self._backend_cls.get(object_refs) File "/localdisk/arunjose/code/unidist/unidist/core/backends/mpi/backend.py", line 79, in get return mpi.get(data_ids) File "/localdisk/arunjose/code/unidist/unidist/core/backends/mpi/core/controller/api.py", line 270, in get values = [get_impl(data_id) for data_id in data_ids] File "/localdisk/arunjose/code/unidist/unidist/core/backends/mpi/core/controller/api.py", line 270, in <listcomp> values = [get_impl(data_id) for data_id in data_ids] File "/localdisk/arunjose/code/unidist/unidist/core/backends/mpi/core/controller/api.py", line 260, in get_impl raise value
TypeError: 'int' object is not subscriptable
Traceback (most recent call last): File "<string>", line 1, in <module> File "/localdisk/arunjose/code/unidist/unidist/api.py", line 92, in init init_backend() File "/localdisk/arunjose/code/unidist/unidist/core/base/utils.py", line 43, in init_backend initialize_mpi() File "/localdisk/arunjose/code/unidist/unidist/core/backends/mpi/utils.py", line 12, in initialize_mpi init() File "/localdisk/arunjose/code/unidist/unidist/core/backends/mpi/core/controller/api.py", line 156, in init asyncio.run(worker_loop()) File "/localdisk/arunjose/miniconda3/envs/modin_on_unidist_2/lib/python3.8/asyncio/runners.py", line 44, in run return loop.run_until_complete(main) File "/localdisk/arunjose/miniconda3/envs/modin_on_unidist_2/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete return future.result() File "/localdisk/arunjose/code/unidist/unidist/core/backends/mpi/core/worker/loop.py", line 90, in worker_loop operation_type, source_rank = await async_wrap( File "/localdisk/arunjose/code/unidist/unidist/core/backends/mpi/core/worker/loop.py", line 63, in run return await loop.run_in_executor(executor, pfunc) File "/localdisk/arunjose/miniconda3/envs/modin_on_unidist_2/lib/python3.8/concurrent/futures/thread.py", line 57, in run result = self.fn(*self.args, **self.kwargs) File "/localdisk/arunjose/code/unidist/unidist/core/backends/mpi/core/communication.py", line 323, in recv_operation_type log_operation(op_type, status) File "/localdisk/arunjose/code/unidist/unidist/core/backends/mpi/core/communication.py", line 66, in log_operation op_name = common.get_op_name(op_type) File "/localdisk/arunjose/code/unidist/unidist/core/backends/mpi/core/common.py", line 87, in get_op_name op_name = operations_dict.get(op, None)
TypeError: unhashable type: 'dict'

Metadata

Metadata

Assignees

No one assigned

    Labels

    MPIMPI backend related issuesbug 🦗Something isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions