Skip to content

[actor_pool] Using Actorpool.map_unordered in Deap causes A worker died or was killed while executing task #3

@ghost

Description

It's mentioned in worker. get that this method will issue a warning if it's running inside async context. The problem I encounter is likely to be caused by this reason.

import ray
import time
from ray.util import ActorPool
from deap import algorithms, base, creator, gp, tools
......

@ray.remote(num_cpus=1)
class Ray_Deap_Map():
    def __init__(self, creator_setup=None, pset_creator=None):
        # issue 946? Ensure non trivial startup to prevent bad load balance across a cluster
        time.sleep(0.01)

        # recreate scope from global
        # For GA no need to provide pset_creator. Both needed for GP
        self.creator_setup = creator_setup
        if creator_setup is not None:
            self.creator_setup()

        self.pset_creator = pset_creator
        if pset_creator is not None:
            self.pset_creator()

    def ray_remote_eval_batch(self, f, zipped_input):
        iterable, id_ = zipped_input
        # attach id so we can reorder the batches
        return [(f(i), id_) for i in iterable]

class Ray_Deap_Map_Manager():
    def __init__(self, creator_setup=None, pset_creator=None):

        # Can adjust the number of processes in ray.init or when launching cluster
        self.n_workers = int(ray.cluster_resources()['CPU'])

        # recreate scope from global (for ex need toolbox in gp too)
        self.creator_setup = creator_setup
        self.pset_creator = pset_creator

    def map(self, func, iterable):

        if self.n_workers == 1:
            # only 1 worker, normal listcomp/map will work fine. Useful for testing code?
            ##results = [func(item) for item in iterable]
            results = list(map(func, iterable))  # forced eval to time it
        else:
            # many workers, lets use ActorPool

            if len(iterable) < self.n_workers:
                n_workers = len(iterable)
            else:
                n_workers = self.n_workers

            n_per_batch = int(len(iterable) / n_workers) + 1
            batches = [iterable[i:i + n_per_batch] for i in range(0, len(iterable), n_per_batch)]
            id_for_reorder = range(len(batches))

            eval_pool = ActorPool(
                [Ray_Deap_Map.remote(self.creator_setup, self.pset_creator) for _ in range(n_workers)])

            unordered_results = list(eval_pool.map_unordered(
                lambda actor, input_tuple: actor.ray_remote_eval_batch.remote(func, input_tuple),
                zip(batches, id_for_reorder)))

            # ensure order of batches
            ordered_batch_results = [batch for batch_id in id_for_reorder for batch in unordered_results if
                                     batch_id == batch[0][1]]

            # flatten batches to list of fitnes
            results = [item[0] for sublist in ordered_batch_results for item in sublist]

        return results

# This is what we register as map in deap toolbox.
# For GA no need to provide pset_creator. Both needed for GP
def ray_deap_map(func, pop, creator_setup=None, pset_creator=None):
    # Manager will determine if batching is needed and crate remote actors to do work
    map_ray_manager = Ray_Deap_Map_Manager(creator_setup, pset_creator)
    results = map_ray_manager.map(func, pop)
    return results

def Tool(pset, max_c, ids):
    toolbox = base.Toolbox()
    ......
    toolbox.register("evaluate", EvaluationBacktest, toolbox=toolbox, max_c=max_c, ids=ids)
    toolbox.register("map", ray_deap_map, creator_setup=Creator_Setup,
                     pset_creator=Pset_Creator)
    return toolbox

def EvaluationBacktest(individual, toolbox, max_c, ids):
    func = toolbox.compile(expr=individual)
    ......
    try:
        df = func(......)
        ......
    except:
        result = -100
    return result,


fitnesses = toolbox.map(toolbox.evaluate, invalid_ind)

Here is the error message:

2021-06-10 21:12:18,290	WARNING worker.py:1115 -- The log monitor on node ALPHA failed with the following error:
OSError: [WinError 87] 参数错误。

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "C:\Users\Administrator\anaconda3\lib\site-packages\ray\_private\log_monitor.py", line 364, in <module>
    log_monitor.run()
  File "C:\Users\Administrator\anaconda3\lib\site-packages\ray\_private\log_monitor.py", line 285, in run
    self.open_closed_files()
  File "C:\Users\Administrator\anaconda3\lib\site-packages\ray\_private\log_monitor.py", line 172, in open_closed_files
    self.close_all_files()
  File "C:\Users\Administrator\anaconda3\lib\site-packages\ray\_private\log_monitor.py", line 107, in close_all_files
    os.kill(file_info.worker_pid, 0)
SystemError: <built-in function kill> returned a result with an error set

2021-06-10 21:12:18,295	WARNING worker.py:1115 -- A worker died or was killed while executing task ffffffffffffffff151a7c83ccb29b66d20c565901000000.
Traceback (most recent call last):

  File "D:\Factor\GA\GA_LongShortStrategy.py", line 569, in <module>
    pop = algorithms.eaMuPlusLambda(pop, toolbox, mu=mu, lambda_=lambda_,

  File "C:\Users\Administrator\anaconda3\lib\site-packages\deap\algorithms.py", line 320, in eaMuPlusLambda
    fitnesses = toolbox.map(toolbox.evaluate, invalid_ind)

  File "D:\Factor\GA\RayMap.py", line 89, in ray_deap_map
    results = map_ray_manager.map(func, pop)

  File "D:\Factor\GA\RayMap.py", line 70, in map
    unordered_results = list(eval_pool.map_unordered(

  File "C:\Users\Administrator\anaconda3\lib\site-packages\ray\util\actor_pool.py", line 91, in map_unordered
    yield self.get_next_unordered()

  File "C:\Users\Administrator\anaconda3\lib\site-packages\ray\util\actor_pool.py", line 210, in get_next_unordered
    return ray.get(future)

  File "C:\Users\Administrator\anaconda3\lib\site-packages\ray\_private\client_mode_hook.py", line 47, in wrapper
    return func(*args, **kwargs)

  File "C:\Users\Administrator\anaconda3\lib\site-packages\ray\worker.py", line 1483, in get
    raise value

RayActorError: The actor died unexpectedly before finishing this task.

Any help would be appreciated.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions