Skip to content

Commit 5613f38

Browse files
committed
Adds futures to prpy.
This backports the futures implementation from adapy upstream into prpy. It does not use the futures for anything, but makes them available for use in later PRs and dependent libraries as part of the `prpy.futures` package.
1 parent 23057b3 commit 5613f38

File tree

1 file changed

+263
-0
lines changed

1 file changed

+263
-0
lines changed

src/prpy/futures.py

Lines changed: 263 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,263 @@
1+
#!/usr/bin/env python
2+
3+
# Copyright (c) 2015, Carnegie Mellon University
4+
# All rights reserved.
5+
# Authors: Michael Koval <mkoval@cs.cmu.edu>
6+
# Pras Velagapudi <pkv@cs.cmu.edu>
7+
#
8+
# Redistribution and use in source and binary forms, with or without
9+
# modification, are permitted provided that the following conditions are met:
10+
#
11+
# - Redistributions of source code must retain the above copyright notice, this
12+
# list of conditions and the following disclaimer.
13+
# - Redistributions in binary form must reproduce the above copyright notice,
14+
# this list of conditions and the following disclaimer in the documentation
15+
# and/or other materials provided with the distribution.
16+
# - Neither the name of Carnegie Mellon University nor the names of its
17+
# contributors may be used to endorse or promote products derived from this
18+
# software without specific prior written permission.
19+
#
20+
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21+
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22+
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
23+
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
24+
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
25+
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
26+
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
27+
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
28+
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
29+
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30+
# POSSIBILITY OF SUCH DAMAGE.
31+
32+
import logging
33+
import threading
34+
from .exceptions import InternalError
35+
36+
logger = logging.getLogger(__name__)
37+
38+
39+
class FutureError(Exception):
40+
""" An error returned by a prpy.futures.Future. """
41+
pass
42+
43+
44+
class TimeoutError(FutureError):
45+
""" A future was not complete within the specified timeout. """
46+
pass
47+
48+
49+
class CancelledError(FutureError):
50+
""" A future failed to complete because it was cancelled. """
51+
pass
52+
53+
54+
class Future(object):
55+
def __init__(self):
56+
self.lock = threading.RLock()
57+
58+
self._is_done = False
59+
self._is_error = False
60+
self._is_cancelled = False
61+
62+
self._handle = None
63+
self._result = None
64+
self._exception = None
65+
66+
self._condition = threading.Condition(self.lock)
67+
self._callbacks = []
68+
69+
def done(self):
70+
""" Return True if the call was cancelled or finished running. """
71+
with self.lock:
72+
return self._is_done
73+
74+
def cancel(self):
75+
""" Attempt to cancel the call. """
76+
raise NotImplementedError('Cancelling is not supported.')
77+
78+
def cancelled(self):
79+
""" Returns True if the call was successfully cancelled. """
80+
with self.lock:
81+
return self._is_done and self._is_cancelled
82+
83+
def result(self, timeout=None):
84+
"""
85+
Wait for and return the result of the call wrapped by this future.
86+
87+
If the call hasn't yet completed then this method will wait up to
88+
`timeout` seconds. If the call hasn't completed in `timeout` seconds,
89+
then a TimeoutError is raised. `timeout` can be an int or float. If
90+
`timeout` is unspecified or None, there is no limit to the wait time.
91+
92+
If the future is cancelled before completing then CancelledError will
93+
be raised.
94+
95+
If the call raised an exception, this method raises the same exception.
96+
97+
@param timeout: seconds to wait for a result, None to wait forever
98+
@type timeout: int or float or None
99+
@returns: the result of the call wrapped by the future
100+
"""
101+
with self.lock:
102+
if not self._is_done:
103+
self._condition.wait(timeout)
104+
105+
if not self._is_done:
106+
raise TimeoutError()
107+
elif self._is_cancelled:
108+
raise CancelledError()
109+
elif self._exception is not None:
110+
raise self._exception
111+
else:
112+
return self._result
113+
114+
def exception(self, timeout=None):
115+
"""
116+
Return the exception raised by the call.
117+
118+
If the call hasn't yet completed then this method will wait up to
119+
`timeout` seconds. If the call hasn't completed in `timeout` seconds,
120+
then a TimeoutError is raised. `timeout` can be an int or float. If
121+
`timeout` is unspecified or None, there is no limit to the wait time.
122+
123+
If the future is cancelled before completing then CancelledError will
124+
be raised.
125+
126+
If the call completed without raising, None is returned.
127+
128+
@param timeout: seconds to wait for a result, None to wait forever
129+
@type timeout: int or float or None
130+
@returns: the exception raised by the call, None if completed normally
131+
"""
132+
with self.lock:
133+
if not self._is_done:
134+
self._condition.wait(timeout)
135+
136+
if not self._is_done:
137+
raise TimeoutError()
138+
elif self._is_cancelled:
139+
raise CancelledError()
140+
elif self._exception is not None:
141+
return self._exception
142+
else:
143+
return None
144+
145+
def add_done_callback(self, fn):
146+
"""
147+
Attach a function to the future to be called on completion.
148+
149+
`fn` will be called, with the future as its only argument, when the
150+
future is cancelled or finishes running. If `fn` was already added as a
151+
callback, this will raise a ValueError.
152+
153+
Added callables are called in the order that they were added and are
154+
always called in a thread belonging to the process that added them. If
155+
the callable raises a Exception subclass, it will be logged and
156+
ignored. If the callable raises a BaseException subclass, the behavior
157+
is undefined.
158+
159+
If the future has already completed or been cancelled, `fn` will be
160+
called immediately.
161+
162+
@param fn: a function that will be called on completion
163+
@type fn: (ResultType) -> None
164+
"""
165+
with self.lock:
166+
if self._is_done:
167+
if fn in self._callbacks:
168+
raise ValueError('Callback is already registered.')
169+
170+
self._callbacks.append(fn)
171+
do_call = False
172+
else:
173+
do_call = True
174+
175+
if do_call:
176+
fn(self)
177+
178+
def remove_done_callback(self, fn):
179+
"""
180+
Remove a function from being called on completion of this future.
181+
182+
If `fn` is not registered as a callback, this will raise a ValueError.
183+
184+
@param fn: a function that will no longer be called on completion
185+
@type fn: (ResultType) -> None
186+
"""
187+
with self.lock:
188+
try:
189+
self._callbacks.remove(fn)
190+
except ValueError:
191+
raise ValueError('Callback was not registered.')
192+
193+
def set_result(self, result):
194+
""" Set the result of this Future. """
195+
self._result = result
196+
self._set_done()
197+
198+
def set_cancelled(self):
199+
""" Flag this Future as being cancelled. """
200+
self._is_cancelled = True
201+
self._set_done()
202+
203+
def set_exception(self, exception):
204+
""" Indicates that an exception has occurred. """
205+
self._exception = exception
206+
self._set_done()
207+
208+
def _set_done(self):
209+
""" Mark this future as done and return a callback function. """
210+
with self.lock:
211+
if self._is_done:
212+
raise InternalError('This future is already done.')
213+
214+
self._is_done = True
215+
callbacks = list(self._callbacks)
216+
217+
self._condition.notify_all()
218+
219+
for callback_fn in callbacks:
220+
try:
221+
callback_fn(self)
222+
except Exception:
223+
self.logger.exception('Callback raised an exception.')
224+
225+
226+
def defer(fn, executor=None, args=(), kwargs={}):
227+
"""
228+
Simple helper to run a function in a thread and pass the result via Future.
229+
230+
This helper wraps a function call and immediately returns a Future for its
231+
result. It then executes the function in a new thread or on a provided
232+
executor and sets the result of the Future as soon as it is available.
233+
234+
Note that this execution is extremely simplistic. Cancellation is not
235+
supported, and no fixed thread pool is used (unless specified via a custom
236+
executor).
237+
238+
@param fn: the function that will be called
239+
@param executor: an executor that has a `.submit()` function, or None
240+
@param args: a list of positional arguments to pass to the function
241+
@param kwargs: a list of keyword arguments to pass to the function
242+
@returns: a future representing the result of this function
243+
"""
244+
# Create a future to store the result of the function.
245+
future = Future()
246+
247+
# Create a wrapper that calls the function and stores the result.
248+
def wrapper():
249+
try:
250+
result = fn(*args, **kwargs)
251+
future.set_result(result)
252+
except BaseException as e:
253+
future.set_exception(e)
254+
255+
# Use the specified executor or new thread to start running the function.
256+
if executor is not None:
257+
executor.submit(wrapper)
258+
else:
259+
t = threading.Thread(target=wrapper)
260+
t.start()
261+
262+
# Return the implicit result as a future.
263+
return future

0 commit comments

Comments
 (0)