Skip to content

Commit 4cdee6b

Browse files
committed
improved docstrings
1 parent de7b51a commit 4cdee6b

File tree

4 files changed

+138
-6
lines changed

4 files changed

+138
-6
lines changed

google/api_core/retry.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,9 @@ class Retry(object):
311311
will be wrapped with retry logic, and any failed outputs will
312312
restart the stream. If False, only the input function call itself
313313
will be retried. Defaults to False.
314+
To avoid duplicate values, retryable streams should typically be
315+
wrapped in additional filter logic before use. For more details, see
316+
``google/api_core/retry_streaming.RetryaleGenerator``.
314317
deadline (float): DEPRECATED: use `timeout` instead. For backward
315318
compatibility, if specified it will override the ``timeout`` parameter.
316319
"""

google/api_core/retry_async.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,8 +173,6 @@ class AsyncRetry:
173173
maximum (float): The maximum amount of time to delay in seconds.
174174
multiplier (float): The multiplier applied to the delay.
175175
timeout (float): How long to keep retrying in seconds.
176-
When ``is_stream``, only time spent waiting on the
177-
target or sleeping between retries is counted towards the timeout.
178176
on_error (Callable[Exception]): A function to call while processing
179177
a retryable exception. Any error raised by this function will
180178
*not* be caught.
@@ -184,6 +182,9 @@ class AsyncRetry:
184182
If True, the iterable will be wrapped with retry logic, and any
185183
failed outputs will restart the stream. If False, only the input
186184
function call itself will be retried. Defaults to False.
185+
To avoid duplicate values, retryable streams should typically be
186+
wrapped in additional filter logic before use. For more details, see
187+
``google.api_core.retry_streaming_async.AsyncRetryableGenerator``.
187188
deadline (float): DEPRECATED use ``timeout`` instead. If set it will
188189
override ``timeout`` parameter.
189190
"""

google/api_core/retry_streaming.py

Lines changed: 66 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,72 @@
3030

3131
class RetryableGenerator(Generator[T, Any, None]):
3232
"""
33-
Helper class for retrying Iterator and Generator-based
34-
streaming APIs.
33+
Generator wrapper for retryable streaming RPCs.
34+
RetryableGenerator will be used when initilizing a retry with
35+
``Retry(is_stream=True)``.
36+
37+
When ``is_stream=False``, the target is treated as a callable,
38+
and will retry when the callable returns an error. When ``is_stream=True``,
39+
the target will be treated as a callable that retruns an iterable. Instead
40+
of just wrapping the initial call in retry logic, the entire iterable is
41+
wrapped, with each yield passing through RetryableGenerator. If any yield
42+
in the stream raises a retryable exception, the entire stream will be
43+
retried.
44+
45+
Important Note: when a stream is encounters a retryable error, it will
46+
silently construct a fresh iterator instance in the background
47+
and continue yielding (likely duplicate) values as if no error occurred.
48+
This is the most general way to retry a stream, but it often is not the
49+
desired behavior. Example: iter([1, 2, 1/0]) -> [1, 2, 1, 2, ...]
50+
51+
There are two ways to build more advanced retry logic for streams:
52+
53+
1. Wrap the target
54+
Use a ``target`` that maintains state between retries, and creates a
55+
different generator on each retry call. For example, you can wrap a
56+
network call in a function that modifies the request based on what has
57+
already been returned:
58+
59+
```
60+
def attempt_with_modified_request(target, request, seen_items=[]):
61+
# remove seen items from request on each attempt
62+
new_request = modify_request(request, seen_items)
63+
new_generator = target(new_request)
64+
for item in new_generator:
65+
yield item
66+
seen_items.append(item)
67+
68+
retry_wrapped = Retry(is_stream=True)(attempt_with_modified_request, target, request, [])
69+
```
70+
71+
2. Wrap the RetryableGenerator
72+
Alternatively, you can wrap the RetryableGenerator itself before
73+
passing it to the end-user to add a filter on the stream. For
74+
example, you can keep track of the items that were successfully yielded
75+
in previous retry attempts, and only yield new items when the
76+
new attempt surpasses the previous ones:
77+
78+
``
79+
def retryable_with_filter(target):
80+
stream_idx = 0
81+
# reset stream_idx when the stream is retried
82+
def on_error(e):
83+
nonlocal stream_idx
84+
stream_idx = 0
85+
# build retryable
86+
retryable_gen = Retry(is_stream=True, on_error=on_error, ...)(target)
87+
# keep track of what has been yielded out of filter
88+
yielded_items = []
89+
for item in retryable_gen:
90+
if stream_idx >= len(yielded_items):
91+
yield item
92+
yielded_items.append(item)
93+
elif item != previous_stream[stream_idx]:
94+
raise ValueError("Stream differs from last attempt")"
95+
stream_idx += 1
96+
97+
filter_retry_wrapped = retryable_with_filter(target)
98+
```
3599
"""
36100

37101
def __init__(

google/api_core/retry_streaming_async.py

Lines changed: 66 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,72 @@
4141

4242
class AsyncRetryableGenerator(AsyncGenerator[T, None]):
4343
"""
44-
Helper class for retrying AsyncIterator and AsyncGenerator-based
45-
streaming APIs.
44+
AsyncGenerator wrapper for retryable streaming RPCs.
45+
AsyncRetryableGenerator will be used when initilizing a retry with
46+
``AsyncRetry(is_stream=True)``.
47+
48+
When ``is_stream=False``, the target is treated as a coroutine,
49+
and will retry when the coroutine returns an error. When ``is_stream=True``,
50+
the target will be treated as a callable that retruns an AsyncIterable. Instead
51+
of just wrapping the initial call in retry logic, the entire iterable is
52+
wrapped, with each yield passing through AsyncRetryableGenerator. If any yield
53+
in the stream raises a retryable exception, the entire stream will be
54+
retried.
55+
56+
Important Note: when a stream is encounters a retryable error, it will
57+
silently construct a fresh iterator instance in the background
58+
and continue yielding (likely duplicate) values as if no error occurred.
59+
This is the most general way to retry a stream, but it often is not the
60+
desired behavior. Example: iter([1, 2, 1/0]) -> [1, 2, 1, 2, ...]
61+
62+
There are two ways to build more advanced retry logic for streams:
63+
64+
1. Wrap the target
65+
Use a ``target`` that maintains state between retries, and creates a
66+
different generator on each retry call. For example, you can wrap a
67+
grpc call in a function that modifies the request based on what has
68+
already been returned:
69+
70+
```
71+
async def attempt_with_modified_request(target, request, seen_items=[]):
72+
# remove seen items from request on each attempt
73+
new_request = modify_request(request, seen_items)
74+
new_generator = await target(new_request)
75+
async for item in new_generator:
76+
yield item
77+
seen_items.append(item)
78+
79+
retry_wrapped = AsyncRetry(is_stream=True)(attempt_with_modified_request, target, request, [])
80+
```
81+
82+
2. Wrap the AsyncRetryableGenerator
83+
Alternatively, you can wrap the AsyncRetryableGenerator itself before
84+
passing it to the end-user to add a filter on the stream. For
85+
example, you can keep track of the items that were successfully yielded
86+
in previous retry attempts, and only yield new items when the
87+
new attempt surpasses the previous ones:
88+
89+
``
90+
async def retryable_with_filter(target):
91+
stream_idx = 0
92+
# reset stream_idx when the stream is retried
93+
def on_error(e):
94+
nonlocal stream_idx
95+
stream_idx = 0
96+
# build retryable
97+
retryable_gen = AsyncRetry(is_stream=True, on_error=on_error, ...)(target)
98+
# keep track of what has been yielded out of filter
99+
yielded_items = []
100+
async for item in retryable_gen:
101+
if stream_idx >= len(yielded_items):
102+
yield item
103+
yielded_items.append(item)
104+
elif item != previous_stream[stream_idx]:
105+
raise ValueError("Stream differs from last attempt")"
106+
stream_idx += 1
107+
108+
filter_retry_wrapped = retryable_with_filter(target)
109+
```
46110
"""
47111

48112
def __init__(

0 commit comments

Comments
 (0)