Skip to content

Commit a601b49

Browse files
author
Michael Koval
committed
Rough PostProcessPath function.
1 parent e2f0839 commit a601b49

File tree

1 file changed

+127
-0
lines changed

1 file changed

+127
-0
lines changed

src/prpy/base/robot.py

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,133 @@ def GetTrajectoryManipulators(self, traj):
181181

182182
return active_manipulators
183183

184+
def PostProcessPath(self, path, defer=False, executor=None, **kwargs):
185+
""" Post-process a geometric path to prepare it for execution.
186+
187+
This method post-processes a geometric path by (optionally) optimizing
188+
it and timing it. Three different post-processing pipelines are used:
189+
190+
1. For constrained trajectories, we do not modify the geometric path
191+
and retime the path to be time-optimal. This trajectory must stop
192+
at every waypoint. The only exception is for...
193+
2. For smooth trajectories, we attempt to fit a time-optimal smooth
194+
curve through the waypoints (not implemented). If this curve is
195+
not collision free, then we fall back on...
196+
3. By default, we run a smoother that jointly times and smooths the
197+
path. This algorithm can change the geometric path to optimize
198+
runtime.
199+
200+
The behavior in (1) and (2) can be forced by passing constrained=True
201+
or smooth=True. By default, the case is inferred by the tag(s) attached
202+
to the trajectory: (1) is triggered by the CONSTRAINED tag and (2) is
203+
tiggered by the SMOOTH tag.
204+
205+
Options an be passed to each post-processing routine using the
206+
shortcut-options, smoothing_options, and retiming_options **kwargs
207+
dictionaries. If no "timelimit" is specified in any of these
208+
dictionaries, it defaults to default_timelimit seconds.
209+
210+
@param path un-timed OpenRAVE trajectory
211+
@param defer return immediately with a future trajectory
212+
@param executor executor to use when defer = True
213+
@param constrained the path is constrained; do not change it
214+
@param smooth the path is smooth; attempt to execute it directly
215+
@param default_timelimit timelimit for all operations, if not set
216+
@param shortcut_options kwargs to ShortcutPath for shortcutting
217+
@param smoothing_options kwargs to RetimeTrajectory for smoothing
218+
@param retiming_options kwargs to RetimeTrajectory for timing
219+
@return trajectory ready for execution
220+
"""
221+
222+
def do_postprocess(path, constrained=None, smooth=None,
223+
default_timelimit=0.5, shortcut_options=None,
224+
smoothing_options=None, retiming_options=None):
225+
from ..planning.base import Tags
226+
from ..util import GetTrajectoryTags, CopyTrajectory
227+
228+
# Default parameters.
229+
if shortcut_options is None:
230+
shortcut_options = dict()
231+
if smoothing_options is None:
232+
smoothing_options = dict()
233+
if retiming_options is None:
234+
retiming_options = dict()
235+
236+
shortcut_options.setdefault('timelimit', default_timelimit)
237+
smoothing_options.setdefault('timelimit', default_timelimit)
238+
retiming_options.setdefault('timelimit', default_timelimit)
239+
240+
# Read default parameters from the trajectory's tags.
241+
tags = GetTrajectoryTags(path)
242+
243+
if constrained is None:
244+
constrained = tags.get(Tags.CONSTRAINED, False)
245+
logger.debug('Detected "%s" tag on trajectory: Setting'
246+
' constrained = True.', Tags.CONSTRAINED)
247+
248+
if smooth is None:
249+
smooth = tags.get(Tags.SMOOTH, False)
250+
logger.debug('Detected "%s" tag on trajectory: Setting smooth'
251+
' = True', Tags.SMOOTH)
252+
253+
with Clone(self.GetEnv()) as cloned_env:
254+
cloned_robot = cloned_env.Cloned(self)
255+
256+
# Planners only operate on the active DOFs. We'll set any DOFs
257+
# in the trajectory as active.
258+
env = path.GetEnv()
259+
cspec = path.GetConfigurationSpecification()
260+
used_bodies = cspec.ExtractUsedBodies(env)
261+
if self not in used_bodies:
262+
raise ValueError(
263+
'Robot "{:s}" is not in the trajectory.'.format(
264+
self.GetName()))
265+
266+
dof_indices, _ = cspec.ExtractUsedIndices(self)
267+
cloned_robot.SetActiveDOFs(dof_indices)
268+
logger.debug(
269+
'Setting robot "%s" DOFs %s as active for post-processing.',
270+
cloned_robot.GetName(), list(dof_indices))
271+
272+
# Directly compute a timing of smooth trajectories.
273+
if smooth:
274+
logger.warning(
275+
'Post-processing smooth paths is not supported.'
276+
' Using the default post-processing logic; this may'
277+
' significantly change the geometric path.'
278+
)
279+
280+
# The trajectory is constrained. Retime it without changing the
281+
# geometric path.
282+
if constrained:
283+
logger.debug('Retiming a constrained path. The output'
284+
' trajectory will stop at every waypoint.')
285+
traj = self.retimer.RetimeTrajectory(
286+
cloned_robot, path, defer=False, **retiming_options)
287+
else:
288+
# The trajectory is not constrained, so we can shortcut it
289+
# before execution.
290+
logger.debug('Shortcutting an unconstrained path.')
291+
path = self.simplifier.ShortcutPath(
292+
cloned_robot, path, defer=False, **shortcut_options)
293+
294+
logger.debug('Smoothing an unconstrained path.')
295+
traj = self.smoother.RetimeTrajectory(
296+
cloned_robot, path, defer=False, **smoothing_options)
297+
298+
return CopyTrajectory(traj, env=self.GetEnv())
299+
300+
if defer:
301+
from trollius.executor import get_default_executor
302+
from trollius.futures import wrap_future
303+
304+
if executor is None:
305+
executor = get_default_executor()
306+
307+
return wrap_future(executor.submit(do_postprocess, path, **kwargs))
308+
else:
309+
return do_postprocess(path, **kwargs)
310+
184311
def ExecutePath(self, path, simplify=True, smooth=True, defer=False,
185312
timeout=1., **kwargs):
186313

0 commit comments

Comments
 (0)