-
Notifications
You must be signed in to change notification settings - Fork 1
Open

Description
It's mentioned in worker. get that this method will issue a warning if it's running inside async context. The problem I encounter is likely to be caused by this reason.
import ray
import time
from ray.util import ActorPool
from deap import algorithms, base, creator, gp, tools
......
@ray.remote(num_cpus=1)
class Ray_Deap_Map():
def __init__(self, creator_setup=None, pset_creator=None):
# issue 946? Ensure non trivial startup to prevent bad load balance across a cluster
time.sleep(0.01)
# recreate scope from global
# For GA no need to provide pset_creator. Both needed for GP
self.creator_setup = creator_setup
if creator_setup is not None:
self.creator_setup()
self.pset_creator = pset_creator
if pset_creator is not None:
self.pset_creator()
def ray_remote_eval_batch(self, f, zipped_input):
iterable, id_ = zipped_input
# attach id so we can reorder the batches
return [(f(i), id_) for i in iterable]
class Ray_Deap_Map_Manager():
def __init__(self, creator_setup=None, pset_creator=None):
# Can adjust the number of processes in ray.init or when launching cluster
self.n_workers = int(ray.cluster_resources()['CPU'])
# recreate scope from global (for ex need toolbox in gp too)
self.creator_setup = creator_setup
self.pset_creator = pset_creator
def map(self, func, iterable):
if self.n_workers == 1:
# only 1 worker, normal listcomp/map will work fine. Useful for testing code?
##results = [func(item) for item in iterable]
results = list(map(func, iterable)) # forced eval to time it
else:
# many workers, lets use ActorPool
if len(iterable) < self.n_workers:
n_workers = len(iterable)
else:
n_workers = self.n_workers
n_per_batch = int(len(iterable) / n_workers) + 1
batches = [iterable[i:i + n_per_batch] for i in range(0, len(iterable), n_per_batch)]
id_for_reorder = range(len(batches))
eval_pool = ActorPool(
[Ray_Deap_Map.remote(self.creator_setup, self.pset_creator) for _ in range(n_workers)])
unordered_results = list(eval_pool.map_unordered(
lambda actor, input_tuple: actor.ray_remote_eval_batch.remote(func, input_tuple),
zip(batches, id_for_reorder)))
# ensure order of batches
ordered_batch_results = [batch for batch_id in id_for_reorder for batch in unordered_results if
batch_id == batch[0][1]]
# flatten batches to list of fitnes
results = [item[0] for sublist in ordered_batch_results for item in sublist]
return results
# This is what we register as map in deap toolbox.
# For GA no need to provide pset_creator. Both needed for GP
def ray_deap_map(func, pop, creator_setup=None, pset_creator=None):
# Manager will determine if batching is needed and crate remote actors to do work
map_ray_manager = Ray_Deap_Map_Manager(creator_setup, pset_creator)
results = map_ray_manager.map(func, pop)
return results
def Tool(pset, max_c, ids):
toolbox = base.Toolbox()
......
toolbox.register("evaluate", EvaluationBacktest, toolbox=toolbox, max_c=max_c, ids=ids)
toolbox.register("map", ray_deap_map, creator_setup=Creator_Setup,
pset_creator=Pset_Creator)
return toolbox
def EvaluationBacktest(individual, toolbox, max_c, ids):
func = toolbox.compile(expr=individual)
......
try:
df = func(......)
......
except:
result = -100
return result,
fitnesses = toolbox.map(toolbox.evaluate, invalid_ind)
Here is the error message:
2021-06-10 21:12:18,290 WARNING worker.py:1115 -- The log monitor on node ALPHA failed with the following error:
OSError: [WinError 87] 参数错误。
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "C:\Users\Administrator\anaconda3\lib\site-packages\ray\_private\log_monitor.py", line 364, in <module>
log_monitor.run()
File "C:\Users\Administrator\anaconda3\lib\site-packages\ray\_private\log_monitor.py", line 285, in run
self.open_closed_files()
File "C:\Users\Administrator\anaconda3\lib\site-packages\ray\_private\log_monitor.py", line 172, in open_closed_files
self.close_all_files()
File "C:\Users\Administrator\anaconda3\lib\site-packages\ray\_private\log_monitor.py", line 107, in close_all_files
os.kill(file_info.worker_pid, 0)
SystemError: <built-in function kill> returned a result with an error set
2021-06-10 21:12:18,295 WARNING worker.py:1115 -- A worker died or was killed while executing task ffffffffffffffff151a7c83ccb29b66d20c565901000000.
Traceback (most recent call last):
File "D:\Factor\GA\GA_LongShortStrategy.py", line 569, in <module>
pop = algorithms.eaMuPlusLambda(pop, toolbox, mu=mu, lambda_=lambda_,
File "C:\Users\Administrator\anaconda3\lib\site-packages\deap\algorithms.py", line 320, in eaMuPlusLambda
fitnesses = toolbox.map(toolbox.evaluate, invalid_ind)
File "D:\Factor\GA\RayMap.py", line 89, in ray_deap_map
results = map_ray_manager.map(func, pop)
File "D:\Factor\GA\RayMap.py", line 70, in map
unordered_results = list(eval_pool.map_unordered(
File "C:\Users\Administrator\anaconda3\lib\site-packages\ray\util\actor_pool.py", line 91, in map_unordered
yield self.get_next_unordered()
File "C:\Users\Administrator\anaconda3\lib\site-packages\ray\util\actor_pool.py", line 210, in get_next_unordered
return ray.get(future)
File "C:\Users\Administrator\anaconda3\lib\site-packages\ray\_private\client_mode_hook.py", line 47, in wrapper
return func(*args, **kwargs)
File "C:\Users\Administrator\anaconda3\lib\site-packages\ray\worker.py", line 1483, in get
raise value
RayActorError: The actor died unexpectedly before finishing this task.
Any help would be appreciated.
Metadata
Metadata
Assignees
Labels
No labels