Skip to content
This repository was archived by the owner on Oct 8, 2022. It is now read-only.

Feature/multi threading #25

Open
wants to merge 12 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions trainer/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,13 @@ def __post_init__(self, motion, log_level, log_file):
def _load_robot(self):
self._robot = simulation.reset(self._scene, self.robot)

def train(self, output, chunk_length=3, num_chunk=50, **kwargs):
trained = trainer.train(self._scene, self._motion, self._robot,
chunk_length, num_chunk, **kwargs)
def train(self, output, **kwargs):
def make_scene(_):
gui_client = BulletClient(connection_mode=pybullet.DIRECT)
scene = Scene(self.timestep, self.frame_skip, client=gui_client)
robot = simulation.reset(scene, self.robot)
return scene, robot
trained = trainer.train(self._motion, make_scene, **kwargs)
trained.dump(output)

def preview(self):
Expand Down
10 changes: 5 additions & 5 deletions trainer/evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from .simulation import apply_joints


def calc_effector_reward(motion, robot, frame, ke, wl, wr):
def calc_effector_reward(motion, robot, frame, *, ke, wl, wr):
diff = 0
for name, effector in frame.effectors.items():
pose = robot.link_state(name).pose
Expand All @@ -26,7 +26,7 @@ def calc_effector_reward(motion, robot, frame, ke, wl, wr):
return - math.exp(normalized) + 1


def calc_stabilization_reward(frame, pre_positions, ks):
def calc_stabilization_reward(frame, pre_positions, *, ks):
if pre_positions is None:
return 0

Expand All @@ -35,11 +35,11 @@ def calc_stabilization_reward(frame, pre_positions, ks):
return - math.exp(normalized) + 1


def calc_reward(motion, robot, frame, pre_positions, we=1, ws=0.1, ke=1, ks=1, wl=1, wr=0.005):
def calc_reward(motion, robot, frame, pre_positions, *, we=1, ws=0.1, ke=1, ks=1, wl=1, wr=0.005):
# TODO: Use more clear naming of hyperparameters

e = calc_effector_reward(motion, robot, frame, ke, wl, wr)
s = calc_stabilization_reward(frame, pre_positions, ks)
e = calc_effector_reward(motion, robot, frame, ke=ke, wl=wl, wr=wr)
s = calc_stabilization_reward(frame, pre_positions, ks=ks)
return e * we + s * ws


Expand Down
88 changes: 64 additions & 24 deletions trainer/train.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import numpy as np
from typing import Dict
from typing import Dict, Optional, List, Callable, Tuple
import dataclasses
from logging import getLogger
import math
from concurrent import futures
import threading

from nevergrad.optimization import optimizerlib
from nevergrad.instrumentation import InstrumentedFunction
Expand Down Expand Up @@ -39,47 +42,86 @@ def save(scene: Scene, robot: Robot):
return StateWithJoints(scene.save_state(), torques)


def train_chunk(scene: Scene, motion: flom.Motion, robot: Robot, start: float, init_weights: np.ndarray, init_state: StateWithJoints, algorithm: str = 'OnePlusOne', num_iteration: int = 1000, weight_factor: float = 0.01, stddev: float = 1, **kwargs):
@dataclasses.dataclass
class Env:
scene: Scene
robot: Robot

state: Optional[StateWithJoints] = None

def __post_init__(self):
self.save()

def save(self):
self.state = StateWithJoints.save(self.scene, self.robot)

def restore(self):
self.state.restore(self.scene, self.robot)


def train_chunk(motion: flom.Motion, envs: List[Env], start: float, init_weights: np.ndarray, *, algorithm: str = 'OnePlusOne', num_iteration: int = 1000, weight_factor: float = 0.01, stddev: float = 1, **kwargs):
weight_shape = np.array(init_weights).shape

def step(weights):
init_state.restore(scene, robot)
thread_envs = {} # type: Dict[int, Env]

def step(weights, env: Env = None):
env = env or thread_envs[threading.get_ident()]

env.restore()

reward_sum = 0
start_ts = scene.ts
start_ts = env.scene.ts

pre_positions = try_get_pre_positions(scene, motion, start=start)
pre_positions = try_get_pre_positions(env.scene, motion, start=start)

for init_weight, frame_weight in zip(init_weights, weights):
frame = motion.frame_at(start + scene.ts - start_ts)
frame = motion.frame_at(start + env.scene.ts - start_ts)

frame.positions = apply_weights(
frame.positions, (init_weight + frame_weight) * weight_factor)
apply_joints(robot, frame.positions)
frame.positions, init_weight + frame_weight * weight_factor)
apply_joints(env.robot, frame.positions)

scene.step()
env.scene.step()

reward_sum += calc_reward(motion, robot, frame, pre_positions, **kwargs)
reward_sum += calc_reward(motion, env.robot, frame, pre_positions, **kwargs)

pre_positions = frame.positions

return -reward_sum

used_idx = 0
lock = threading.Lock()
def register_thread():
nonlocal used_idx, thread_envs
with lock:
thread_envs[threading.get_ident()] = envs[used_idx]
used_idx += 1

weights_param = Gaussian(mean=0, std=stddev, shape=weight_shape)
inst_step = InstrumentedFunction(step, weights_param)
optimizer = optimizerlib.registry[algorithm](
dimension=inst_step.dimension, budget=num_iteration, num_workers=1)
recommendation = optimizer.optimize(inst_step)
dimension=inst_step.dimension, budget=num_iteration, num_workers=len(envs))

with futures.ThreadPoolExecutor(max_workers=optimizer.num_workers, initializer=register_thread) as executor:
recommendation = optimizer.optimize(inst_step, executor=executor)
weights = np.reshape(recommendation, weight_shape)

reward = step(weights)
for e in envs:
reward = step(weights, e)
e.save()

return reward, weights * weight_factor


def train(motion: flom.Motion, make_scene: Callable[[int], Tuple[Scene, Robot]], *, num_workers: int = 5, chunk_length: int = 3, num_chunk: Optional[int] = None, **kwargs):
envs = [Env(*make_scene(i)) for i in range(num_workers)]
first_env = envs[0]

state = StateWithJoints.save(scene, robot)
return reward, weights, state
chunk_duration = first_env.scene.dt * chunk_length

if num_chunk is None:
num_chunk = math.ceil(motion.length() / chunk_duration)

def train(scene, motion, robot, chunk_length=3, num_chunk=100, weight_factor=0.01, **kwargs):
chunk_duration = scene.dt * chunk_length
total_length = chunk_duration * num_chunk
log.info(f"chunk duration: {chunk_duration} s")
log.info(f"motion length: {motion.length()} s")
Expand All @@ -88,22 +130,20 @@ def train(scene, motion, robot, chunk_length=3, num_chunk=100, weight_factor=0.0
if total_length < motion.length():
log.warning(f"A total length to train is shorter than the length of motion")

num_frames = int(motion.length() / scene.dt)
num_frames = int(motion.length() / first_env.scene.dt)
num_joints = len(list(motion.joint_names())) # TODO: Call len() directly
weights = np.zeros(shape=(num_frames, num_joints))
log.info(f"shape of weights: {weights.shape}")
log.debug(f"kwargs: {kwargs}")

last_state = StateWithJoints.save(scene, robot)
for chunk_idx in range(num_chunk):
start = chunk_idx * chunk_duration
start_idx = chunk_idx * chunk_length % num_frames

r = range(start_idx, start_idx + chunk_length)
in_weights = [weights[i % num_frames] for i in r]
log.info(f"start training chunk {chunk_idx} ({start}~)")
reward, out_weights, last_state = train_chunk(
scene, motion, robot, start, in_weights, last_state, weight_factor=weight_factor, **kwargs)
reward, out_weights = train_chunk(motion, envs, start, in_weights, **kwargs)
for i, w in zip(r, out_weights):
weights[i % num_frames] = w

Expand All @@ -117,8 +157,8 @@ def train(scene, motion, robot, chunk_length=3, num_chunk=100, weight_factor=0.0
new_motion.set_effector_weight(name, motion.effector_weight(name))

for i, frame_weight in enumerate(weights):
t = i * scene.dt
t = i * first_env.scene.dt
new_frame = motion.frame_at(t)
new_frame.positions = apply_weights(new_frame.positions, frame_weight * weight_factor)
new_frame.positions = apply_weights(new_frame.positions, frame_weight)
new_motion.insert_keyframe(t, new_frame)
return new_motion