-
Notifications
You must be signed in to change notification settings - Fork 89
feat: retry and retry_async support streaming rpcs #495
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 120 commits
953106a
89aeb75
27feb80
0dffa6d
b330c3b
67ceaa2
ee2647a
5a5396c
7afa76b
2d91ade
0cd384e
f72bbec
88eed5c
91f9cc4
c3eb997
f6c6201
57b0ee3
e814ce7
0ffb03f
a8024f3
c76f641
ee631e3
70eb78c
42ee132
102d83b
f029dbd
c83c62a
185826c
c5f7bbe
4242036
9c4799c
0bd6cab
67aeeaf
985b13a
0ea8297
b952652
6cb3e2d
99da116
7f862d0
04a4a69
d20cf08
183c221
d2217e4
d4a9d30
06d45cc
de41a14
dcb3766
dd368e4
452b9bb
6879418
847509f
b5e3796
7a7d9ac
6619895
27fc930
d6a23ea
90ef834
6201db6
61ce3a7
69149a1
d63871e
773e033
d1def5d
cbaaa1d
21a863f
878ddfb
7b0a600
0188228
902a4ab
74f3f3e
e506aad
5baa2aa
5c3805d
265d998
0423ebe
c4049f5
acd6546
b1ad4b3
8dcf67c
6104c59
43d0913
9ba7676
14c195c
de7b51a
4cdee6b
a526d65
ee2bbdd
5f82355
9900c40
2c2dcbe
3340399
de07714
67068ac
54325bc
bafa18b
2ae2a32
9cadd63
c9ef1d5
41c7868
30fccb9
a2b0e6c
4aa1ab4
8349424
ece5cf8
5ddda24
9e3ea92
3b06b3a
8bb6b0c
37c64a0
cee0028
3a7e5fa
ba6dc9f
0500b8b
1ccadb1
c312262
1fe57e0
4f09f29
06824b9
343157b
93f82cc
0915ca0
61e5ab5
51c125b
02604bc
6269db2
0dcd0de
54e9c81
2342910
eada0d7
ae2bf37
c8a4f26
2840b9f
82274a3
1594a17
9b0ddb0
8985127
60b20ab
237ca3d
a46c0f7
93727b7
796ae52
0688ffe
da048ab
80e5eb0
562079b
a0fecc5
8cc6ea9
e7a5cd4
02c12cc
03b1608
b05b11f
0b5d3a2
03f2af5
5fee888
239ed7d
94eb0f5
7d1e246
b0faa2d
6c44298
51df672
e207376
39716a7
2bbf33f
3b03bfa
e63701d
c101ea6
3642d74
34cfa08
583181d
b311b87
19a998d
5637e88
c4be5f2
4d9e762
2e9e84b
d183a7e
e2d9c9c
4543106
d791aad
638cc68
f7b1e14
07db4c2
d448a52
781426a
4a05404
b221c8d
b5b4534
0f1145d
8408512
aa69c56
d1ac29d
3ab88fc
382d0e2
4258823
1bc9731
aafe057
8095229
de9f518
7864667
4c24322
7855513
f4bfb02
a88cf6f
b5c62e1
852f4f8
cd8323e
ace61eb
1bbd1f0
35cc00a
89abfa4
74ab817
85b3e02
6dbe17d
71e5888
cbae3d3
61198b8
acf9752
7cf9fbf
f62439a
b7abeca
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -66,6 +66,7 @@ def check_if_exists(): | |
|
||
from google.api_core import datetime_helpers | ||
from google.api_core import exceptions | ||
from google.api_core.retry_streaming import retry_target_stream | ||
from google.auth import exceptions as auth_exceptions | ||
|
||
_LOGGER = logging.getLogger(__name__) | ||
|
@@ -154,7 +155,7 @@ def retry_target( | |
higher-level retry helper :class:`Retry`. | ||
|
||
Args: | ||
target(Callable): The function to call and retry. This must be a | ||
target(Callable[[], Any]): The function to call and retry. This must be a | ||
nullary function - apply arguments with `functools.partial`. | ||
predicate (Callable[Exception]): A callable used to determine if an | ||
exception raised by the target should be considered retryable. | ||
|
@@ -301,6 +302,18 @@ class Retry(object): | |
maximum (float): The maximum amount of time to delay in seconds. | ||
multiplier (float): The multiplier applied to the delay. | ||
timeout (float): How long to keep retrying, in seconds. | ||
on_error (Callable[Exception]): A function to call while processing | ||
daniel-sanche marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In
Should they be the same in both places? And it doesn't seem that the type hints in the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah they probably should, I think I was just avoiding changing too much of the existing comments If we're going to make a change, maybe we should remove the types from the comments entirely now that we have type annotations, so we don't have to duplicate it in multiple places? |
||
a retryable exception. Any error raised by this function will | ||
*not* be caught. | ||
is_stream (bool): Indicates whether the input function | ||
should be treated as an stream function (i.e. a Generator, | ||
daniel-sanche marked this conversation as resolved.
Show resolved
Hide resolved
|
||
or function that returns an Iterable). If True, the iterable | ||
will be wrapped with retry logic, and any failed outputs will | ||
restart the stream. If False, only the input function call itself | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Restart the stream from the last non-failed output, right ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unfortunately that's not really something we can do in a general way. We can create a new generator and start yielding from it again, but we have no guarantee that it will yield the same sequence as last time. (For example, in Bigtable we will modify the request after a failure to avoid requesting repeat data from the server) I considered adding a "filter" on top of the generator, and providing a default filter that raises an exception if a retry deviates from the initial stream, which would be one way to do what you requested. But in the end, I decided that it's probably better to keep this code simple and just pass on the generator values directly, and the caller can do their own filtering over top of the retry-stream There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this functionality is not intended for external users directly, but rather for the library generator and handwritten library code, this seems fine. But the generator and library developers need to be aware of this and design their Please expand the docstring of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good, I added a much more documentation. Let me know that looks better So far this will mostly just be useful for hand-written layers. If we want to add this to gapic libraries automatically, we could try to classify the different kinds of streams our libraries return and provide a couple different presets for different kinds of streams. But so far that's been out of scope. |
||
will be retried. Defaults to False. | ||
To avoid duplicate values, retryable streams should typically be | ||
wrapped in additional filter logic before use. For more details, see | ||
``google.api_core.retry_streaming.retry_target_stream``. | ||
daniel-sanche marked this conversation as resolved.
Show resolved
Hide resolved
|
||
deadline (float): DEPRECATED: use `timeout` instead. For backward | ||
compatibility, if specified it will override the ``timeout`` parameter. | ||
""" | ||
|
@@ -313,7 +326,8 @@ def __init__( | |
multiplier=_DEFAULT_DELAY_MULTIPLIER, | ||
timeout=_DEFAULT_DEADLINE, | ||
on_error=None, | ||
**kwargs | ||
is_stream=False, | ||
daniel-sanche marked this conversation as resolved.
Show resolved
Hide resolved
|
||
**kwargs, | ||
): | ||
self._predicate = predicate | ||
self._initial = initial | ||
|
@@ -322,6 +336,7 @@ def __init__( | |
self._timeout = kwargs.get("deadline", timeout) | ||
self._deadline = self._timeout | ||
self._on_error = on_error | ||
self._is_stream = is_stream | ||
|
||
def __call__(self, func, on_error=None): | ||
"""Wrap a callable with retry behavior. | ||
|
@@ -346,7 +361,8 @@ def retry_wrapped_func(*args, **kwargs): | |
sleep_generator = exponential_sleep_generator( | ||
self._initial, self._maximum, multiplier=self._multiplier | ||
) | ||
return retry_target( | ||
retry_func = retry_target_stream if self._is_stream else retry_target | ||
return retry_func( | ||
target, | ||
self._predicate, | ||
sleep_generator, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -61,6 +61,7 @@ async def check_if_exists(): | |
from google.api_core.retry import exponential_sleep_generator | ||
from google.api_core.retry import if_exception_type # noqa: F401 | ||
from google.api_core.retry import if_transient_error | ||
from google.api_core.retry_streaming_async import retry_target_stream | ||
|
||
|
||
_LOGGER = logging.getLogger(__name__) | ||
|
@@ -74,13 +75,13 @@ async def check_if_exists(): | |
async def retry_target( | ||
target, predicate, sleep_generator, timeout=None, on_error=None, **kwargs | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the streaming versions have type declarations. Do you want to have type declarations in the non-stream versions? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, I was trying to avoid scope growth, but it probably makes sense to add at this point. Done |
||
): | ||
"""Call a function and retry if it fails. | ||
"""Await a coroutine and retry if it fails. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IIUC, the sleep-delay logic should be the same between all four versions of this function: syncvs async, unary vs streaming. I would suggest:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was trying to avoid making changes to the existing retries, but I suppose with the new base classes, it makes sense to generalize them a bit. I added a shared Note though that I had to make a (potentially breaking) change to the async unary retry logic to make it consistent though. Previously async_unary would attempt to cancel early when the deadline is reached, while the other retries only check the deadline after the request terminates. The old behavior was very slow (using asyncio.wait_for), and caused race conditions. I'd consier it a bug, but fixing the behavior may count as a breaking change (even though gapic functions shouldn't be affected) |
||
|
||
This is the lowest-level retry helper. Generally, you'll use the | ||
higher-level retry helper :class:`Retry`. | ||
|
||
Args: | ||
target(Callable): The function to call and retry. This must be a | ||
target(Callable[[], Any]): The function to call and retry. This must be a | ||
nullary function - apply arguments with `functools.partial`. | ||
predicate (Callable[Exception]): A callable used to determine if an | ||
exception raised by the target should be considered retryable. | ||
|
@@ -156,7 +157,7 @@ async def retry_target( | |
|
||
|
||
class AsyncRetry: | ||
"""Exponential retry decorator for async functions. | ||
"""Exponential retry decorator for async coroutines. | ||
|
||
This class is a decorator used to add exponential back-off retry behavior | ||
to an RPC call. | ||
|
@@ -175,6 +176,15 @@ class AsyncRetry: | |
on_error (Callable[Exception]): A function to call while processing | ||
a retryable exception. Any error raised by this function will | ||
*not* be caught. | ||
is_stream (bool): Indicates whether the input function | ||
should be treated as an stream function (i.e. an AsyncGenerator, | ||
daniel-sanche marked this conversation as resolved.
Show resolved
Hide resolved
|
||
or function or coroutine that returns an AsyncIterable). | ||
If True, the iterable will be wrapped with retry logic, and any | ||
failed outputs will restart the stream. If False, only the input | ||
function call itself will be retried. Defaults to False. | ||
To avoid duplicate values, retryable streams should typically be | ||
wrapped in additional filter logic before use. For more details, see | ||
``google.api_core.retry_streaming_async.retry_target_stream``. | ||
daniel-sanche marked this conversation as resolved.
Show resolved
Hide resolved
|
||
deadline (float): DEPRECATED use ``timeout`` instead. If set it will | ||
override ``timeout`` parameter. | ||
""" | ||
|
@@ -187,6 +197,7 @@ def __init__( | |
multiplier=_DEFAULT_DELAY_MULTIPLIER, | ||
timeout=_DEFAULT_TIMEOUT, | ||
on_error=None, | ||
is_stream=False, | ||
**kwargs | ||
): | ||
self._predicate = predicate | ||
|
@@ -196,12 +207,13 @@ def __init__( | |
self._timeout = kwargs.get("deadline", timeout) | ||
self._deadline = self._timeout | ||
self._on_error = on_error | ||
self._is_stream = is_stream | ||
|
||
def __call__(self, func, on_error=None): | ||
"""Wrap a callable with retry behavior. | ||
|
||
Args: | ||
func (Callable): The callable to add retry behavior to. | ||
func (Callable): The callable or stream to add retry behavior to. | ||
on_error (Callable[Exception]): A function to call while processing | ||
a retryable exception. Any error raised by this function will | ||
*not* be caught. | ||
|
@@ -224,11 +236,29 @@ async def retry_wrapped_func(*args, **kwargs): | |
target, | ||
self._predicate, | ||
sleep_generator, | ||
self._timeout, | ||
timeout=self._timeout, | ||
on_error=on_error, | ||
) | ||
|
||
@functools.wraps(func) | ||
def retry_wrapped_stream(*args, **kwargs): | ||
"""A wrapper that iterates over target stream with retry.""" | ||
target = functools.partial(func, *args, **kwargs) | ||
sleep_generator = exponential_sleep_generator( | ||
self._initial, self._maximum, multiplier=self._multiplier | ||
) | ||
return retry_target_stream( | ||
target, | ||
self._predicate, | ||
sleep_generator, | ||
timeout=self._timeout, | ||
on_error=on_error, | ||
) | ||
|
||
return retry_wrapped_func | ||
if self._is_stream: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we can't use a function pointer here, like you did in the sync case ( NB: I notice that you're making changes to this PR, so I can't find the code that I quoted in the current version, though it is in the PR that I cloned locally There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the only reason is because the existing But I just made a change to make There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like the new structure |
||
return retry_wrapped_stream | ||
else: | ||
return retry_wrapped_func | ||
|
||
def _replace( | ||
self, | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,221 @@ | ||||||
# Copyright 2023 Google LLC | ||||||
# | ||||||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||||||
# you may not use this file except in compliance with the License. | ||||||
# You may obtain a copy of the License at | ||||||
# | ||||||
# http://www.apache.org/licenses/LICENSE-2.0 | ||||||
# | ||||||
# Unless required by applicable law or agreed to in writing, software | ||||||
# distributed under the License is distributed on an "AS IS" BASIS, | ||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
# See the License for the specific language governing permissions and | ||||||
# limitations under the License. | ||||||
|
||||||
""" | ||||||
Generator wrapper for retryable streaming RPCs. | ||||||
This function will be used when initilizing a retry with | ||||||
``Retry(is_stream=True)``. | ||||||
|
||||||
When ``is_stream=False``, the target is treated as a callable, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this paragraph and the list are more appropriate in the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, my only worry is it would be adding a lot of text to the existing wall of text in Do you think this should be moved into the main Retry docstring? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I think that's the right level of abstraction for these comments. I suggest keeping the more immediate, more likely-to-be-referenced parts at the top of the comment, and these details further down There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||||||
and will retry when the callable returns an error. When ``is_stream=True``, | ||||||
the target will be treated as a callable that retruns an iterable. Instead | ||||||
daniel-sanche marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
of just wrapping the initial call in retry logic, the entire iterable is | ||||||
wrapped, with each yield passing through the retryable generator. If any yield | ||||||
in the stream raises a retryable exception, the entire stream will be | ||||||
retried. | ||||||
|
||||||
Important Note: when a stream is encounters a retryable error, it will | ||||||
daniel-sanche marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
silently construct a fresh iterator instance in the background | ||||||
and continue yielding (likely duplicate) values as if no error occurred. | ||||||
This is the most general way to retry a stream, but it often is not the | ||||||
desired behavior. Example: iter([1, 2, 1/0]) -> [1, 2, 1, 2, ...] | ||||||
|
||||||
There are two ways to build more advanced retry logic for streams: | ||||||
|
||||||
1. Wrap the target | ||||||
Use a ``target`` that maintains state between retries, and creates a | ||||||
different generator on each retry call. For example, you can wrap a | ||||||
network call in a function that modifies the request based on what has | ||||||
already been returned: | ||||||
|
||||||
``` | ||||||
def attempt_with_modified_request(target, request, seen_items=[]): | ||||||
# remove seen items from request on each attempt | ||||||
new_request = modify_request(request, seen_items) | ||||||
new_generator = target(new_request) | ||||||
for item in new_generator: | ||||||
yield item | ||||||
seen_items.append(item) | ||||||
|
||||||
retry_wrapped = Retry(is_stream=True)(attempt_with_modified_request, target, request, []) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, I'm not sure why this comment didn't make on my previous review:
def attempt_with_modified_request(target, request):
seen_items=[]
def attempter():
# remove seen items from request on each attempt
new_request = modify_request(request, seen_items)
new_generator = target(new_request)
for item in new_generator:
yield item
seen_items.append(item)
return attempter
retry_wrapped = Retry(is_stream=True)(attempt_with_modified_request(target, request)) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch. That change looks fine to me, but I might have a slight preference to just breaking the end into two lines to avoid the extra nesting: def attempt_with_modified_request(target, request, seen_items=[]):
# remove seen items from request on each attempt
new_request = modify_request(request, seen_items)
new_generator = target(new_request)
for item in new_generator:
yield item
seen_items.append(item)
retry_wrapped_fn = Retry(is_stream=True)(attempt_with_modified_request)
retryable_generator = retry_wrapped_fn(target, request) Let me know what you think There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, I think that should work. Had to spend some time on the various levels of indirection, with the decorators and generators. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah I'm fine with either way, whatever seems more readable to you |
||||||
``` | ||||||
|
||||||
2. Wrap the retry generator | ||||||
daniel-sanche marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
Alternatively, you can wrap the retryable generator itself before | ||||||
passing it to the end-user to add a filter on the stream. For | ||||||
example, you can keep track of the items that were successfully yielded | ||||||
in previous retry attempts, and only yield new items when the | ||||||
new attempt surpasses the previous ones: | ||||||
|
||||||
`` | ||||||
def retryable_with_filter(target): | ||||||
stream_idx = 0 | ||||||
# reset stream_idx when the stream is retried | ||||||
def on_error(e): | ||||||
nonlocal stream_idx | ||||||
stream_idx = 0 | ||||||
# build retryable | ||||||
retryable_gen = Retry(is_stream=True, on_error=on_error, ...)(target) | ||||||
# keep track of what has been yielded out of filter | ||||||
yielded_items = [] | ||||||
for item in retryable_gen: | ||||||
if stream_idx >= len(yielded_items): | ||||||
yield item | ||||||
daniel-sanche marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
yielded_items.append(item) | ||||||
elif item != previous_stream[stream_idx]: | ||||||
raise ValueError("Stream differs from last attempt")" | ||||||
stream_idx += 1 | ||||||
|
||||||
filter_retry_wrapped = retryable_with_filter(target) | ||||||
``` | ||||||
""" | ||||||
|
||||||
from typing import ( | ||||||
Callable, | ||||||
Optional, | ||||||
List, | ||||||
Tuple, | ||||||
Iterable, | ||||||
Generator, | ||||||
TypeVar, | ||||||
Any, | ||||||
Union, | ||||||
cast, | ||||||
) | ||||||
|
||||||
import logging | ||||||
import time | ||||||
from functools import partial | ||||||
|
||||||
from google.api_core import exceptions | ||||||
|
||||||
_LOGGER = logging.getLogger(__name__) | ||||||
|
||||||
T = TypeVar("T") | ||||||
|
||||||
|
||||||
def _build_timeout_error( | ||||||
exc_list: List[Exception], is_timeout: bool, timeout_val: float | ||||||
) -> Tuple[Exception, Optional[Exception]]: | ||||||
""" | ||||||
Default exception_factory implementation. Builds an exception after the retry fails | ||||||
|
||||||
Args: | ||||||
- exc_list (list[Exception]): list of exceptions that occurred during the retry | ||||||
- is_timeout (bool): whether the failure is due to the timeout value being exceeded, | ||||||
or due to a non-retryable exception | ||||||
- timeout_val (float): the original timeout value for the retry, for use in the exception message | ||||||
|
||||||
Returns: | ||||||
- tuple[Exception, Exception|None]: a tuple of the exception to be raised, and the cause exception if any | ||||||
""" | ||||||
src_exc = exc_list[-1] if exc_list else None | ||||||
if is_timeout: | ||||||
return ( | ||||||
exceptions.RetryError( | ||||||
"Timeout of {:.1f}s exceeded".format(timeout_val), | ||||||
src_exc, | ||||||
), | ||||||
src_exc, | ||||||
) | ||||||
else: | ||||||
return exc_list[-1], None | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems more robust:
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the problem was with mypy, since src_exc can be None but we want to always return an exception. I made some changes to fall back to a default error if no exceptions are passed though. Let me know what you think |
||||||
|
||||||
|
||||||
def retry_target_stream( | ||||||
target: Callable[[], Union[Iterable[T], Generator[T, Any, None]]], | ||||||
predicate: Callable[[Exception], bool], | ||||||
sleep_generator: Iterable[float], | ||||||
timeout: Optional[float] = None, | ||||||
on_error: Optional[Callable[[Exception], None]] = None, | ||||||
exception_factory: Optional[ | ||||||
Callable[[List[Exception], bool, float], Tuple[Exception, Optional[Exception]]] | ||||||
] = None, | ||||||
**kwargs, | ||||||
) -> Generator[T, Any, None]: | ||||||
"""Create a generator wrapper that retries the wrapped stream if it fails. | ||||||
|
||||||
This is the lowest-level retry helper. Generally, you'll use the | ||||||
higher-level retry helper :class:`Retry`. | ||||||
|
||||||
Args: | ||||||
target: The generator function to call and retry. This must be a | ||||||
nullary function - apply arguments with `functools.partial`. | ||||||
predicate: A callable used to determine if an | ||||||
exception raised by the target should be considered retryable. | ||||||
It should return True to retry or False otherwise. | ||||||
sleep_generator: An infinite iterator that determines | ||||||
how long to sleep between retries. | ||||||
timeout: How long to keep retrying the target. | ||||||
Note: timeout is only checked before initiating a retry, so the target may | ||||||
run past the timeout value as long as it is healthy. | ||||||
on_error: A function to call while processing a | ||||||
retryable exception. Any error raised by this function will *not* | ||||||
be caught. | ||||||
exception_factory: A function that is called when the retryable reaches | ||||||
a terminal failure state, used to construct an exception to be raised. | ||||||
It it given a list of all exceptions encountered, a boolean indicating | ||||||
whether the failure was due to a timeout, and the original timeout value | ||||||
as arguments. It should return a tuple of the exception to be raised, | ||||||
along with the cause exception if any. | ||||||
If not provided, a default implementation will raise a RetryError | ||||||
on timeout, or the last exception encountered otherwise. | ||||||
|
||||||
Returns: | ||||||
Generator: A retryable generator that wraps the target generator function. | ||||||
|
||||||
Raises: | ||||||
ValueError: If the sleep generator stops yielding values. | ||||||
Exception: a custom exception specified by the exception_factory if provided. | ||||||
If no exception_factory is provided: | ||||||
google.api_core.RetryError: If the deadline is exceeded while retrying. | ||||||
Exception: If the target raises an error that isn't retryable. | ||||||
""" | ||||||
|
||||||
timeout = kwargs.get("deadline", timeout) | ||||||
deadline: Optional[float] = time.monotonic() + timeout if timeout else None | ||||||
error_list: List[Exception] = [] | ||||||
exc_factory = partial( | ||||||
exception_factory or _build_timeout_error, timeout_val=timeout | ||||||
) | ||||||
|
||||||
for sleep in sleep_generator: | ||||||
# Start a new retry loop | ||||||
try: | ||||||
# create and yeild from a new instance of the generator from input generator function | ||||||
daniel-sanche marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
subgenerator = target() | ||||||
return (yield from subgenerator) | ||||||
# handle exceptions raised by the subgenerator | ||||||
except Exception as exc: | ||||||
daniel-sanche marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
error_list.append(exc) | ||||||
if not predicate(exc): | ||||||
final_exc, source_exc = exc_factory( | ||||||
exc_list=error_list, is_timeout=False | ||||||
) | ||||||
raise final_exc from source_exc | ||||||
if on_error is not None: | ||||||
on_error(exc) | ||||||
finally: | ||||||
if subgenerator is not None and getattr(subgenerator, "close", None): | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm unclear: It seems (but I haven't found many references) like the attribute is Also, am I correct in understanding that the purpose of this to ensure the generator is closed when we process an exception above? My understanding is that the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch. I believe this is left-over from an earlier attempt, where I was trying to implement more of the generator protocol manually instead of relying on I think it should be safe to remove this extra check, and the unit tests seem to back that up |
||||||
cast(Generator, subgenerator).close() | ||||||
|
||||||
if deadline is not None and time.monotonic() + sleep > deadline: | ||||||
final_exc, source_exc = exc_factory(exc_list=error_list, is_timeout=True) | ||||||
raise final_exc from source_exc | ||||||
_LOGGER.debug( | ||||||
"Retrying due to {}, sleeping {:.1f}s ...".format(error_list[-1], sleep) | ||||||
) | ||||||
time.sleep(sleep) | ||||||
|
||||||
raise ValueError("Sleep generator stopped yielding sleep values.") |
Uh oh!
There was an error while loading. Please reload this page.