Skip to content

Commit cee0028

Browse files
committed
replaces sync streaming retries object with generator function
1 parent 37c64a0 commit cee0028

File tree

3 files changed

+118
-363
lines changed

3 files changed

+118
-363
lines changed

google/api_core/retry.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ def check_if_exists():
6666

6767
from google.api_core import datetime_helpers
6868
from google.api_core import exceptions
69-
from google.api_core.retry_streaming import RetryableGenerator
69+
from google.api_core.retry_streaming import retry_target_generator
7070
from google.auth import exceptions as auth_exceptions
7171

7272
_LOGGER = logging.getLogger(__name__)
@@ -361,7 +361,7 @@ def retry_wrapped_func(*args, **kwargs):
361361
sleep_generator = exponential_sleep_generator(
362362
self._initial, self._maximum, multiplier=self._multiplier
363363
)
364-
retry_func = RetryableGenerator if self._is_stream else retry_target
364+
retry_func = retry_target_generator if self._is_stream else retry_target
365365
return retry_func(
366366
target,
367367
self._predicate,

google/api_core/retry_streaming.py

Lines changed: 44 additions & 262 deletions
Original file line numberDiff line numberDiff line change
@@ -73,268 +73,50 @@ def _build_timeout_error(
7373
return exc_list[-1], None
7474

7575

76-
class RetryableGenerator(Generator[T, Any, None]):
77-
"""
78-
Generator wrapper for retryable streaming RPCs.
79-
RetryableGenerator will be used when initilizing a retry with
80-
``Retry(is_stream=True)``.
81-
82-
When ``is_stream=False``, the target is treated as a callable,
83-
and will retry when the callable returns an error. When ``is_stream=True``,
84-
the target will be treated as a callable that retruns an iterable. Instead
85-
of just wrapping the initial call in retry logic, the entire iterable is
86-
wrapped, with each yield passing through RetryableGenerator. If any yield
87-
in the stream raises a retryable exception, the entire stream will be
88-
retried.
89-
90-
Important Note: when a stream is encounters a retryable error, it will
91-
silently construct a fresh iterator instance in the background
92-
and continue yielding (likely duplicate) values as if no error occurred.
93-
This is the most general way to retry a stream, but it often is not the
94-
desired behavior. Example: iter([1, 2, 1/0]) -> [1, 2, 1, 2, ...]
95-
96-
There are two ways to build more advanced retry logic for streams:
97-
98-
1. Wrap the target
99-
Use a ``target`` that maintains state between retries, and creates a
100-
different generator on each retry call. For example, you can wrap a
101-
network call in a function that modifies the request based on what has
102-
already been returned:
103-
104-
```
105-
def attempt_with_modified_request(target, request, seen_items=[]):
106-
# remove seen items from request on each attempt
107-
new_request = modify_request(request, seen_items)
108-
new_generator = target(new_request)
109-
for item in new_generator:
110-
yield item
111-
seen_items.append(item)
112-
113-
retry_wrapped = Retry(is_stream=True)(attempt_with_modified_request, target, request, [])
114-
```
115-
116-
2. Wrap the RetryableGenerator
117-
Alternatively, you can wrap the RetryableGenerator itself before
118-
passing it to the end-user to add a filter on the stream. For
119-
example, you can keep track of the items that were successfully yielded
120-
in previous retry attempts, and only yield new items when the
121-
new attempt surpasses the previous ones:
122-
123-
``
124-
def retryable_with_filter(target):
125-
stream_idx = 0
126-
# reset stream_idx when the stream is retried
127-
def on_error(e):
128-
nonlocal stream_idx
129-
stream_idx = 0
130-
# build retryable
131-
retryable_gen = Retry(is_stream=True, on_error=on_error, ...)(target)
132-
# keep track of what has been yielded out of filter
133-
yielded_items = []
134-
for item in retryable_gen:
135-
if stream_idx >= len(yielded_items):
136-
yield item
137-
yielded_items.append(item)
138-
elif item != previous_stream[stream_idx]:
139-
raise ValueError("Stream differs from last attempt")"
140-
stream_idx += 1
141-
142-
filter_retry_wrapped = retryable_with_filter(target)
143-
```
144-
"""
145-
146-
def __init__(
147-
self,
148-
target: Callable[[], Iterable[T]],
149-
predicate: Callable[[Exception], bool],
150-
sleep_generator: Iterable[float],
151-
timeout: Optional[float] = None,
152-
on_error: Optional[Callable[[Exception], None]] = None,
153-
exception_factory: Optional[
154-
Callable[
155-
[List[Exception], bool, float], Tuple[Exception, Optional[Exception]]
156-
]
157-
] = None,
158-
check_timeout_on_yield: bool = False,
159-
):
160-
"""
161-
Args:
162-
target: The function to call to produce iterables for each retry.
163-
This must be a nullary function - apply arguments with
164-
`functools.partial`.
165-
predicate: A callable used to determine if an
166-
exception raised by the target should be considered retryable.
167-
It should return True to retry or False otherwise.
168-
sleep_generator: An infinite iterator that determines
169-
how long to sleep between retries.
170-
timeout: How long to keep retrying the target, in seconds.
171-
on_error: A function to call while processing a
172-
retryable exception. Any error raised by this function will *not*
173-
be caught.
174-
exception_factory: A function that creates an exception to raise
175-
when the retry fails. The function takes three arguments:
176-
a list of exceptions that occurred during the retry, a boolean
177-
indicating whether the failure is due to retry timeout, and the original
178-
timeout value (for building a helpful error message). It is expected to
179-
return a tuple of the exception to raise and (optionally) a source
180-
exception to chain to the raised exception.
181-
If not provided, a default exception will be raised.
182-
check_timeout_on_yield: If True, the timeout value will be checked
183-
after each yield. If the timeout has been exceeded, the generator
184-
will raise an exception from exception_factory.
185-
Note that this adds an overhead to each yield, so it is better
186-
to add the timeout logic to the wrapped stream when possible.
187-
"""
188-
self.target_fn = target
189-
self.active_target: Iterator[T] = self.target_fn().__iter__()
190-
self.predicate = predicate
191-
self.sleep_generator = iter(sleep_generator)
192-
self.on_error = on_error
193-
self.deadline: Optional[float] = time.monotonic() + timeout if timeout else None
194-
self._check_timeout_on_yield = check_timeout_on_yield
195-
self.error_list: List[Exception] = []
196-
self._exc_factory = partial(
197-
exception_factory or _build_timeout_error, timeout_val=timeout
198-
)
199-
200-
def __iter__(self) -> Generator[T, Any, None]:
201-
"""
202-
Implement the iterator protocol.
203-
"""
204-
return self
205-
206-
def _handle_exception(self, exc) -> None:
207-
"""
208-
When an exception is raised while iterating over the active_target,
209-
check if it is retryable. If so, create a new active_target and
210-
continue iterating. If not, raise the exception.
211-
"""
212-
self.error_list.append(exc)
213-
if not self.predicate(exc):
214-
final_exc, src_exc = self._exc_factory(
215-
exc_list=self.error_list, is_timeout=False
216-
)
217-
raise final_exc from src_exc
218-
else:
219-
# run on_error callback if provided
220-
if self.on_error:
221-
self.on_error(exc)
222-
try:
223-
next_sleep = next(self.sleep_generator)
224-
except StopIteration:
225-
raise ValueError("Sleep generator stopped yielding sleep values")
226-
# if deadline is exceeded, raise exception
227-
if self.deadline is not None:
228-
next_attempt = time.monotonic() + next_sleep
229-
self._check_timeout(next_attempt)
230-
# sleep before retrying
231-
_LOGGER.debug(
232-
"Retrying due to {}, sleeping {:.1f}s ...".format(exc, next_sleep)
233-
)
234-
time.sleep(next_sleep)
235-
self.active_target = self.target_fn().__iter__()
236-
237-
def _check_timeout(self, current_time: float) -> None:
238-
"""
239-
Helper function to check if the timeout has been exceeded, and raise an exception if so.
240-
241-
Args:
242-
- current_time: the timestamp to check against the deadline
243-
- source_exception: the exception that triggered the timeout check, if any
244-
Raises:
245-
- Exception from exception_factory if the timeout has been exceeded
246-
"""
247-
if self.deadline is not None and self.deadline < current_time:
248-
exc, src_exc = self._exc_factory(exc_list=self.error_list, is_timeout=True)
249-
raise exc from src_exc
250-
251-
def __next__(self) -> T:
252-
"""
253-
Implement the iterator protocol.
254-
255-
Returns:
256-
- the next value of the active_target iterator
257-
"""
258-
# check for expired timeouts before attempting to iterate
259-
if self._check_timeout_on_yield:
260-
self._check_timeout(time.monotonic())
76+
def retry_target_generator(
77+
target: Callable[[], Iterable[T]],
78+
predicate: Callable[[Exception], bool],
79+
sleep_generator: Iterable[float],
80+
timeout: Optional[float] = None,
81+
on_error: Optional[Callable[[Exception], None]] = None,
82+
exception_factory: Optional[
83+
Callable[
84+
[List[Exception], bool, float], Tuple[Exception, Optional[Exception]]
85+
]
86+
] = None,
87+
**kwargs,
88+
) -> Generator[T, Any, None]:
89+
timeout = kwargs.get("deadline", timeout)
90+
deadline: Optional[float] = time.monotonic() + timeout if timeout else None
91+
error_list: List[Exception] = []
92+
exc_factory = partial(
93+
exception_factory or _build_timeout_error, timeout_val=timeout
94+
)
95+
96+
for sleep in sleep_generator:
97+
# Start a new retry loop
26198
try:
262-
return next(self.active_target)
99+
# create and yeild from a new instance of the generator from input generator function
100+
subgenerator = target()
101+
return (yield from subgenerator)
102+
# handle exceptions raised by the subgenerator
263103
except Exception as exc:
264-
self._handle_exception(exc)
265-
# if retryable exception was handled, try again with new active_target
266-
return self.__next__()
267-
268-
def close(self) -> None:
269-
"""
270-
Close the active_target if supported. (e.g. target is a generator)
271-
272-
Raises:
273-
- AttributeError if the active_target does not have a close() method
274-
"""
275-
if getattr(self.active_target, "close", None):
276-
casted_target = cast(Generator, self.active_target)
277-
return casted_target.close()
278-
else:
279-
raise AttributeError(
280-
"close() not implemented for {}".format(self.active_target)
281-
)
282-
283-
def send(self, *args, **kwargs) -> T:
284-
"""
285-
Call send on the active_target if supported. (e.g. target is a generator)
286-
287-
If an exception is raised, a retry may be attempted before returning
288-
a result.
289-
290-
Args:
291-
- *args: arguments to pass to the wrapped generator's send method
292-
- **kwargs: keyword arguments to pass to the wrapped generator's send method
293-
Returns:
294-
- the next value of the active_target iterator after calling send
295-
Raises:
296-
- AttributeError if the active_target does not have a send() method
297-
"""
298-
# check for expired timeouts before attempting to iterate
299-
if self._check_timeout_on_yield:
300-
self._check_timeout(time.monotonic())
301-
if getattr(self.active_target, "send", None):
302-
casted_target = cast(Generator, self.active_target)
303-
try:
304-
return casted_target.send(*args, **kwargs)
305-
except Exception as exc:
306-
self._handle_exception(exc)
307-
# if exception was retryable, use new target for return value
308-
return self.__next__()
309-
else:
310-
raise AttributeError(
311-
"send() not implemented for {}".format(self.active_target)
312-
)
313-
314-
def throw(self, *args, **kwargs) -> T:
315-
"""
316-
Call throw on the active_target if supported. (e.g. target is a generator)
317-
318-
If an exception is raised, a retry may be attempted before returning
319-
a result.
104+
error_list.append(exc)
105+
if not predicate(exc):
106+
exc, source_exc = exc_factory(exc_list=error_list, is_timeout=False)
107+
raise exc from source_exc
108+
if on_error is not None:
109+
on_error(exc)
110+
finally:
111+
if subgenerator is not None and getattr(subgenerator, "close", None):
112+
subgenerator.close()
113+
114+
if deadline is not None and time.monotonic() + sleep > deadline:
115+
exc, source_exc = exc_factory(exc_list=error_list, is_timeout=True)
116+
raise exc from source_exc
117+
_LOGGER.debug(
118+
"Retrying due to {}, sleeping {:.1f}s ...".format(error_list[-1], sleep)
119+
)
120+
time.sleep(sleep)
320121

321-
Args:
322-
- *args: arguments to pass to the wrapped generator's throw method
323-
- **kwargs: keyword arguments to pass to the wrapped generator's throw method
324-
Returns:
325-
- the next vale of the active_target iterator after calling throw
326-
Raises:
327-
- AttributeError if the active_target does not have a throw() method
328-
"""
329-
if getattr(self.active_target, "throw", None):
330-
casted_target = cast(Generator, self.active_target)
331-
try:
332-
return casted_target.throw(*args, **kwargs)
333-
except Exception as exc:
334-
self._handle_exception(exc)
335-
# if retryable exception was handled, return next from new active_target
336-
return self.__next__()
337-
else:
338-
raise AttributeError(
339-
"throw() not implemented for {}".format(self.active_target)
340-
)
122+
raise ValueError("Sleep generator stopped yielding sleep values.")

0 commit comments

Comments
 (0)