Skip to content

Commit 93f82cc

Browse files
committed
improved documentation
1 parent 343157b commit 93f82cc

File tree

2 files changed

+208
-133
lines changed

2 files changed

+208
-133
lines changed

google/api_core/retry_streaming.py

Lines changed: 103 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,74 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
"""Helpers for retries for streaming APIs."""
15+
"""
16+
Generator wrapper for retryable streaming RPCs.
17+
This function will be used when initilizing a retry with
18+
``Retry(is_stream=True)``.
19+
20+
When ``is_stream=False``, the target is treated as a callable,
21+
and will retry when the callable returns an error. When ``is_stream=True``,
22+
the target will be treated as a callable that retruns an iterable. Instead
23+
of just wrapping the initial call in retry logic, the entire iterable is
24+
wrapped, with each yield passing through the retryable generator. If any yield
25+
in the stream raises a retryable exception, the entire stream will be
26+
retried.
27+
28+
Important Note: when a stream is encounters a retryable error, it will
29+
silently construct a fresh iterator instance in the background
30+
and continue yielding (likely duplicate) values as if no error occurred.
31+
This is the most general way to retry a stream, but it often is not the
32+
desired behavior. Example: iter([1, 2, 1/0]) -> [1, 2, 1, 2, ...]
33+
34+
There are two ways to build more advanced retry logic for streams:
35+
36+
1. Wrap the target
37+
Use a ``target`` that maintains state between retries, and creates a
38+
different generator on each retry call. For example, you can wrap a
39+
network call in a function that modifies the request based on what has
40+
already been returned:
41+
42+
```
43+
def attempt_with_modified_request(target, request, seen_items=[]):
44+
# remove seen items from request on each attempt
45+
new_request = modify_request(request, seen_items)
46+
new_generator = target(new_request)
47+
for item in new_generator:
48+
yield item
49+
seen_items.append(item)
50+
51+
retry_wrapped = Retry(is_stream=True)(attempt_with_modified_request, target, request, [])
52+
```
53+
54+
2. Wrap the retry generator
55+
Alternatively, you can wrap the retryable generator itself before
56+
passing it to the end-user to add a filter on the stream. For
57+
example, you can keep track of the items that were successfully yielded
58+
in previous retry attempts, and only yield new items when the
59+
new attempt surpasses the previous ones:
60+
61+
``
62+
def retryable_with_filter(target):
63+
stream_idx = 0
64+
# reset stream_idx when the stream is retried
65+
def on_error(e):
66+
nonlocal stream_idx
67+
stream_idx = 0
68+
# build retryable
69+
retryable_gen = Retry(is_stream=True, on_error=on_error, ...)(target)
70+
# keep track of what has been yielded out of filter
71+
yielded_items = []
72+
for item in retryable_gen:
73+
if stream_idx >= len(yielded_items):
74+
yield item
75+
yielded_items.append(item)
76+
elif item != previous_stream[stream_idx]:
77+
raise ValueError("Stream differs from last attempt")"
78+
stream_idx += 1
79+
80+
filter_retry_wrapped = retryable_with_filter(target)
81+
```
82+
"""
1683

1784
from typing import (
1885
Callable,
@@ -77,74 +144,45 @@ def retry_target_stream(
77144
] = None,
78145
**kwargs,
79146
) -> Generator[T, Any, None]:
80-
"""
81-
Generator wrapper for retryable streaming RPCs.
82-
This function will be used when initilizing a retry with
83-
``Retry(is_stream=True)``.
84-
85-
When ``is_stream=False``, the target is treated as a callable,
86-
and will retry when the callable returns an error. When ``is_stream=True``,
87-
the target will be treated as a callable that retruns an iterable. Instead
88-
of just wrapping the initial call in retry logic, the entire iterable is
89-
wrapped, with each yield passing through the retryable generator. If any yield
90-
in the stream raises a retryable exception, the entire stream will be
91-
retried.
92-
93-
Important Note: when a stream is encounters a retryable error, it will
94-
silently construct a fresh iterator instance in the background
95-
and continue yielding (likely duplicate) values as if no error occurred.
96-
This is the most general way to retry a stream, but it often is not the
97-
desired behavior. Example: iter([1, 2, 1/0]) -> [1, 2, 1, 2, ...]
98-
99-
There are two ways to build more advanced retry logic for streams:
100-
101-
1. Wrap the target
102-
Use a ``target`` that maintains state between retries, and creates a
103-
different generator on each retry call. For example, you can wrap a
104-
network call in a function that modifies the request based on what has
105-
already been returned:
147+
"""Create a generator wrapper that retries the wrapped stream if it fails.
106148
107-
```
108-
def attempt_with_modified_request(target, request, seen_items=[]):
109-
# remove seen items from request on each attempt
110-
new_request = modify_request(request, seen_items)
111-
new_generator = target(new_request)
112-
for item in new_generator:
113-
yield item
114-
seen_items.append(item)
115-
116-
retry_wrapped = Retry(is_stream=True)(attempt_with_modified_request, target, request, [])
117-
```
149+
This is the lowest-level retry helper. Generally, you'll use the
150+
higher-level retry helper :class:`Retry`.
118151
119-
2. Wrap the retry generator
120-
Alternatively, you can wrap the retryable generator itself before
121-
passing it to the end-user to add a filter on the stream. For
122-
example, you can keep track of the items that were successfully yielded
123-
in previous retry attempts, and only yield new items when the
124-
new attempt surpasses the previous ones:
152+
Args:
153+
target: The generator function to call and retry. This must be a
154+
nullary function - apply arguments with `functools.partial`.
155+
predicate: A callable used to determine if an
156+
exception raised by the target should be considered retryable.
157+
It should return True to retry or False otherwise.
158+
sleep_generator: An infinite iterator that determines
159+
how long to sleep between retries.
160+
timeout: How long to keep retrying the target.
161+
Note: timeout is only checked before initiating a retry, so the target may
162+
run past the timeout value as long as it is healthy.
163+
on_error: A function to call while processing a
164+
retryable exception. Any error raised by this function will *not*
165+
be caught.
166+
exception_factory: A function that is called when the retryable reaches
167+
a terminal failure state, used to construct an exception to be raised.
168+
It it given a list of all exceptions encountered, a boolean indicating
169+
whether the failure was due to a timeout, and the original timeout value
170+
as arguments. It should return a tuple of the exception to be raised,
171+
along with the cause exception if any.
172+
If not provided, a default implementation will raise a RetryError
173+
on timeout, or the last exception encountered otherwise.
125174
126-
``
127-
def retryable_with_filter(target):
128-
stream_idx = 0
129-
# reset stream_idx when the stream is retried
130-
def on_error(e):
131-
nonlocal stream_idx
132-
stream_idx = 0
133-
# build retryable
134-
retryable_gen = Retry(is_stream=True, on_error=on_error, ...)(target)
135-
# keep track of what has been yielded out of filter
136-
yielded_items = []
137-
for item in retryable_gen:
138-
if stream_idx >= len(yielded_items):
139-
yield item
140-
yielded_items.append(item)
141-
elif item != previous_stream[stream_idx]:
142-
raise ValueError("Stream differs from last attempt")"
143-
stream_idx += 1
144-
145-
filter_retry_wrapped = retryable_with_filter(target)
146-
```
175+
Returns:
176+
Generator: A retryable generator that wraps the target generator function.
177+
178+
Raises:
179+
ValueError: If the sleep generator stops yielding values.
180+
Exception: a custom exception specified by the exception_factory if provided.
181+
If no exception_factory is provided:
182+
google.api_core.RetryError: If the deadline is exceeded while retrying.
183+
Exception: If the target raises an error that isn't retryable.
147184
"""
185+
148186
timeout = kwargs.get("deadline", timeout)
149187
deadline: Optional[float] = time.monotonic() + timeout if timeout else None
150188
error_list: List[Exception] = []

google/api_core/retry_streaming_async.py

Lines changed: 105 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,74 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14-
15-
"""Helpers for retries for async streaming APIs."""
14+
"""
15+
Generator wrapper for retryable streaming RPCs.
16+
This function will be used when initilizing a retry with
17+
``AsyncRetry(is_stream=True)``.
18+
19+
When ``is_stream=False``, the target is treated as a coroutine,
20+
and will retry when the coroutine returns an error. When ``is_stream=True``,
21+
the target will be treated as a callable that retruns an AsyncIterable. Instead
22+
of just wrapping the initial call in retry logic, the entire iterable is
23+
wrapped, with each yield passing through the retryable generatpr. If any yield
24+
in the stream raises a retryable exception, the entire stream will be
25+
retried.
26+
27+
Important Note: when a stream is encounters a retryable error, it will
28+
silently construct a fresh iterator instance in the background
29+
and continue yielding (likely duplicate) values as if no error occurred.
30+
This is the most general way to retry a stream, but it often is not the
31+
desired behavior. Example: iter([1, 2, 1/0]) -> [1, 2, 1, 2, ...]
32+
33+
There are two ways to build more advanced retry logic for streams:
34+
35+
1. Wrap the target
36+
Use a ``target`` that maintains state between retries, and creates a
37+
different generator on each retry call. For example, you can wrap a
38+
grpc call in a function that modifies the request based on what has
39+
already been returned:
40+
41+
```
42+
async def attempt_with_modified_request(target, request, seen_items=[]):
43+
# remove seen items from request on each attempt
44+
new_request = modify_request(request, seen_items)
45+
new_generator = await target(new_request)
46+
async for item in new_generator:
47+
yield item
48+
seen_items.append(item)
49+
50+
retry_wrapped = AsyncRetry(is_stream=True)(attempt_with_modified_request, target, request, [])
51+
```
52+
53+
2. Wrap the retry generator
54+
Alternatively, you can wrap the retryable generator itself before
55+
passing it to the end-user to add a filter on the stream. For
56+
example, you can keep track of the items that were successfully yielded
57+
in previous retry attempts, and only yield new items when the
58+
new attempt surpasses the previous ones:
59+
60+
``
61+
async def retryable_with_filter(target):
62+
stream_idx = 0
63+
# reset stream_idx when the stream is retried
64+
def on_error(e):
65+
nonlocal stream_idx
66+
stream_idx = 0
67+
# build retryable
68+
retryable_gen = AsyncRetry(is_stream=True, on_error=on_error, ...)(target)
69+
# keep track of what has been yielded out of filter
70+
yielded_items = []
71+
async for item in retryable_gen:
72+
if stream_idx >= len(yielded_items):
73+
yield item
74+
yielded_items.append(item)
75+
elif item != previous_stream[stream_idx]:
76+
raise ValueError("Stream differs from last attempt")"
77+
stream_idx += 1
78+
79+
filter_retry_wrapped = retryable_with_filter(target)
80+
```
81+
"""
1682

1783
from typing import (
1884
cast,
@@ -56,74 +122,45 @@ async def retry_target_stream(
56122
] = None,
57123
**kwargs,
58124
) -> AsyncGenerator[T, None]:
125+
"""Create a generator wrapper that retries the wrapped stream if it fails.
126+
127+
This is the lowest-level retry helper. Generally, you'll use the
128+
higher-level retry helper :class:`Retry`.
129+
130+
Args:
131+
target: The generator function to call and retry. This must be a
132+
nullary function - apply arguments with `functools.partial`.
133+
predicate: A callable used to determine if an
134+
exception raised by the target should be considered retryable.
135+
It should return True to retry or False otherwise.
136+
sleep_generator: An infinite iterator that determines
137+
how long to sleep between retries.
138+
timeout: How long to keep retrying the target.
139+
Note: timeout is only checked before initiating a retry, so the target may
140+
run past the timeout value as long as it is healthy.
141+
on_error: A function to call while processing a
142+
retryable exception. Any error raised by this function will *not*
143+
be caught.
144+
exception_factory: A function that is called when the retryable reaches
145+
a terminal failure state, used to construct an exception to be raised.
146+
It it given a list of all exceptions encountered, a boolean indicating
147+
whether the failure was due to a timeout, and the original timeout value
148+
as arguments. It should return a tuple of the exception to be raised,
149+
along with the cause exception if any.
150+
If not provided, a default implementation will raise a RetryError
151+
on timeout, or the last exception encountered otherwise.
152+
153+
Returns:
154+
AssyncGenerator: A retryable generator that wraps the target generator function.
155+
156+
Raises:
157+
ValueError: If the sleep generator stops yielding values.
158+
Exception: a custom exception specified by the exception_factory if provided.
159+
If no exception_factory is provided:
160+
google.api_core.RetryError: If the deadline is exceeded while retrying.
161+
Exception: If the target raises an error that isn't retryable.
59162
"""
60-
Generator wrapper for retryable streaming RPCs.
61-
This function will be used when initilizing a retry with
62-
``AsyncRetry(is_stream=True)``.
63-
64-
When ``is_stream=False``, the target is treated as a coroutine,
65-
and will retry when the coroutine returns an error. When ``is_stream=True``,
66-
the target will be treated as a callable that retruns an AsyncIterable. Instead
67-
of just wrapping the initial call in retry logic, the entire iterable is
68-
wrapped, with each yield passing through the retryable generatpr. If any yield
69-
in the stream raises a retryable exception, the entire stream will be
70-
retried.
71-
72-
Important Note: when a stream is encounters a retryable error, it will
73-
silently construct a fresh iterator instance in the background
74-
and continue yielding (likely duplicate) values as if no error occurred.
75-
This is the most general way to retry a stream, but it often is not the
76-
desired behavior. Example: iter([1, 2, 1/0]) -> [1, 2, 1, 2, ...]
77-
78-
There are two ways to build more advanced retry logic for streams:
79-
80-
1. Wrap the target
81-
Use a ``target`` that maintains state between retries, and creates a
82-
different generator on each retry call. For example, you can wrap a
83-
grpc call in a function that modifies the request based on what has
84-
already been returned:
85163

86-
```
87-
async def attempt_with_modified_request(target, request, seen_items=[]):
88-
# remove seen items from request on each attempt
89-
new_request = modify_request(request, seen_items)
90-
new_generator = await target(new_request)
91-
async for item in new_generator:
92-
yield item
93-
seen_items.append(item)
94-
95-
retry_wrapped = AsyncRetry(is_stream=True)(attempt_with_modified_request, target, request, [])
96-
```
97-
98-
2. Wrap the retry generator
99-
Alternatively, you can wrap the retryable generator itself before
100-
passing it to the end-user to add a filter on the stream. For
101-
example, you can keep track of the items that were successfully yielded
102-
in previous retry attempts, and only yield new items when the
103-
new attempt surpasses the previous ones:
104-
105-
``
106-
async def retryable_with_filter(target):
107-
stream_idx = 0
108-
# reset stream_idx when the stream is retried
109-
def on_error(e):
110-
nonlocal stream_idx
111-
stream_idx = 0
112-
# build retryable
113-
retryable_gen = AsyncRetry(is_stream=True, on_error=on_error, ...)(target)
114-
# keep track of what has been yielded out of filter
115-
yielded_items = []
116-
async for item in retryable_gen:
117-
if stream_idx >= len(yielded_items):
118-
yield item
119-
yielded_items.append(item)
120-
elif item != previous_stream[stream_idx]:
121-
raise ValueError("Stream differs from last attempt")"
122-
stream_idx += 1
123-
124-
filter_retry_wrapped = retryable_with_filter(target)
125-
```
126-
"""
127164
subgenerator: Optional[AsyncIterator[T]] = None
128165
timeout = kwargs.get("deadline", timeout)
129166
deadline: Optional[float] = time.monotonic() + timeout if timeout else None

0 commit comments

Comments
 (0)