Skip to content

Commit fe4c12c

Browse files
committed
Merge pull request #250 from personalrobotics/feature/futures
Added a Future class.
2 parents 574bd9e + 5613f38 commit fe4c12c

File tree

1 file changed

+263
-0
lines changed

1 file changed

+263
-0
lines changed

src/prpy/futures.py

Lines changed: 263 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,263 @@
1+
#!/usr/bin/env python
2+
3+
# Copyright (c) 2015, Carnegie Mellon University
4+
# All rights reserved.
5+
# Authors: Michael Koval <mkoval@cs.cmu.edu>
6+
# Pras Velagapudi <pkv@cs.cmu.edu>
7+
#
8+
# Redistribution and use in source and binary forms, with or without
9+
# modification, are permitted provided that the following conditions are met:
10+
#
11+
# - Redistributions of source code must retain the above copyright notice, this
12+
# list of conditions and the following disclaimer.
13+
# - Redistributions in binary form must reproduce the above copyright notice,
14+
# this list of conditions and the following disclaimer in the documentation
15+
# and/or other materials provided with the distribution.
16+
# - Neither the name of Carnegie Mellon University nor the names of its
17+
# contributors may be used to endorse or promote products derived from this
18+
# software without specific prior written permission.
19+
#
20+
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21+
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22+
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
23+
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
24+
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
25+
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
26+
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
27+
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
28+
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
29+
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30+
# POSSIBILITY OF SUCH DAMAGE.
31+
32+
import logging
33+
import threading
34+
from .exceptions import InternalError
35+
36+
logger = logging.getLogger(__name__)
37+
38+
39+
class FutureError(Exception):
40+
""" An error returned by a prpy.futures.Future. """
41+
pass
42+
43+
44+
class TimeoutError(FutureError):
45+
""" A future was not complete within the specified timeout. """
46+
pass
47+
48+
49+
class CancelledError(FutureError):
50+
""" A future failed to complete because it was cancelled. """
51+
pass
52+
53+
54+
class Future(object):
55+
def __init__(self):
56+
self.lock = threading.RLock()
57+
58+
self._is_done = False
59+
self._is_error = False
60+
self._is_cancelled = False
61+
62+
self._handle = None
63+
self._result = None
64+
self._exception = None
65+
66+
self._condition = threading.Condition(self.lock)
67+
self._callbacks = []
68+
69+
def done(self):
70+
""" Return True if the call was cancelled or finished running. """
71+
with self.lock:
72+
return self._is_done
73+
74+
def cancel(self):
75+
""" Attempt to cancel the call. """
76+
raise NotImplementedError('Cancelling is not supported.')
77+
78+
def cancelled(self):
79+
""" Returns True if the call was successfully cancelled. """
80+
with self.lock:
81+
return self._is_done and self._is_cancelled
82+
83+
def result(self, timeout=None):
84+
"""
85+
Wait for and return the result of the call wrapped by this future.
86+
87+
If the call hasn't yet completed then this method will wait up to
88+
`timeout` seconds. If the call hasn't completed in `timeout` seconds,
89+
then a TimeoutError is raised. `timeout` can be an int or float. If
90+
`timeout` is unspecified or None, there is no limit to the wait time.
91+
92+
If the future is cancelled before completing then CancelledError will
93+
be raised.
94+
95+
If the call raised an exception, this method raises the same exception.
96+
97+
@param timeout: seconds to wait for a result, None to wait forever
98+
@type timeout: int or float or None
99+
@returns: the result of the call wrapped by the future
100+
"""
101+
with self.lock:
102+
if not self._is_done:
103+
self._condition.wait(timeout)
104+
105+
if not self._is_done:
106+
raise TimeoutError()
107+
elif self._is_cancelled:
108+
raise CancelledError()
109+
elif self._exception is not None:
110+
raise self._exception
111+
else:
112+
return self._result
113+
114+
def exception(self, timeout=None):
115+
"""
116+
Return the exception raised by the call.
117+
118+
If the call hasn't yet completed then this method will wait up to
119+
`timeout` seconds. If the call hasn't completed in `timeout` seconds,
120+
then a TimeoutError is raised. `timeout` can be an int or float. If
121+
`timeout` is unspecified or None, there is no limit to the wait time.
122+
123+
If the future is cancelled before completing then CancelledError will
124+
be raised.
125+
126+
If the call completed without raising, None is returned.
127+
128+
@param timeout: seconds to wait for a result, None to wait forever
129+
@type timeout: int or float or None
130+
@returns: the exception raised by the call, None if completed normally
131+
"""
132+
with self.lock:
133+
if not self._is_done:
134+
self._condition.wait(timeout)
135+
136+
if not self._is_done:
137+
raise TimeoutError()
138+
elif self._is_cancelled:
139+
raise CancelledError()
140+
elif self._exception is not None:
141+
return self._exception
142+
else:
143+
return None
144+
145+
def add_done_callback(self, fn):
146+
"""
147+
Attach a function to the future to be called on completion.
148+
149+
`fn` will be called, with the future as its only argument, when the
150+
future is cancelled or finishes running. If `fn` was already added as a
151+
callback, this will raise a ValueError.
152+
153+
Added callables are called in the order that they were added and are
154+
always called in a thread belonging to the process that added them. If
155+
the callable raises a Exception subclass, it will be logged and
156+
ignored. If the callable raises a BaseException subclass, the behavior
157+
is undefined.
158+
159+
If the future has already completed or been cancelled, `fn` will be
160+
called immediately.
161+
162+
@param fn: a function that will be called on completion
163+
@type fn: (ResultType) -> None
164+
"""
165+
with self.lock:
166+
if self._is_done:
167+
if fn in self._callbacks:
168+
raise ValueError('Callback is already registered.')
169+
170+
self._callbacks.append(fn)
171+
do_call = False
172+
else:
173+
do_call = True
174+
175+
if do_call:
176+
fn(self)
177+
178+
def remove_done_callback(self, fn):
179+
"""
180+
Remove a function from being called on completion of this future.
181+
182+
If `fn` is not registered as a callback, this will raise a ValueError.
183+
184+
@param fn: a function that will no longer be called on completion
185+
@type fn: (ResultType) -> None
186+
"""
187+
with self.lock:
188+
try:
189+
self._callbacks.remove(fn)
190+
except ValueError:
191+
raise ValueError('Callback was not registered.')
192+
193+
def set_result(self, result):
194+
""" Set the result of this Future. """
195+
self._result = result
196+
self._set_done()
197+
198+
def set_cancelled(self):
199+
""" Flag this Future as being cancelled. """
200+
self._is_cancelled = True
201+
self._set_done()
202+
203+
def set_exception(self, exception):
204+
""" Indicates that an exception has occurred. """
205+
self._exception = exception
206+
self._set_done()
207+
208+
def _set_done(self):
209+
""" Mark this future as done and return a callback function. """
210+
with self.lock:
211+
if self._is_done:
212+
raise InternalError('This future is already done.')
213+
214+
self._is_done = True
215+
callbacks = list(self._callbacks)
216+
217+
self._condition.notify_all()
218+
219+
for callback_fn in callbacks:
220+
try:
221+
callback_fn(self)
222+
except Exception:
223+
self.logger.exception('Callback raised an exception.')
224+
225+
226+
def defer(fn, executor=None, args=(), kwargs={}):
227+
"""
228+
Simple helper to run a function in a thread and pass the result via Future.
229+
230+
This helper wraps a function call and immediately returns a Future for its
231+
result. It then executes the function in a new thread or on a provided
232+
executor and sets the result of the Future as soon as it is available.
233+
234+
Note that this execution is extremely simplistic. Cancellation is not
235+
supported, and no fixed thread pool is used (unless specified via a custom
236+
executor).
237+
238+
@param fn: the function that will be called
239+
@param executor: an executor that has a `.submit()` function, or None
240+
@param args: a list of positional arguments to pass to the function
241+
@param kwargs: a list of keyword arguments to pass to the function
242+
@returns: a future representing the result of this function
243+
"""
244+
# Create a future to store the result of the function.
245+
future = Future()
246+
247+
# Create a wrapper that calls the function and stores the result.
248+
def wrapper():
249+
try:
250+
result = fn(*args, **kwargs)
251+
future.set_result(result)
252+
except BaseException as e:
253+
future.set_exception(e)
254+
255+
# Use the specified executor or new thread to start running the function.
256+
if executor is not None:
257+
executor.submit(wrapper)
258+
else:
259+
t = threading.Thread(target=wrapper)
260+
t.start()
261+
262+
# Return the implicit result as a future.
263+
return future

0 commit comments

Comments
 (0)