36
36
from functools import partial
37
37
38
38
from google .api_core .retry_streaming import _build_timeout_error
39
- from google .api_core .retry_streaming import _TerminalException
40
39
41
40
_LOGGER = logging .getLogger (__name__ )
42
41
43
42
T = TypeVar ("T" )
44
43
45
44
45
+ class _TerminalException (Exception ):
46
+ """
47
+ Exception to bypasses retry logic and raises __cause__ immediately.
48
+ """
49
+ pass
50
+
51
+
46
52
async def retry_target_generator (
47
53
target : Union [
48
54
Callable [[], AsyncIterable [T ]],
@@ -59,6 +65,74 @@ async def retry_target_generator(
59
65
] = None ,
60
66
** kwargs ,
61
67
) -> AsyncGenerator [T , None ]:
68
+ """
69
+ Generator wrapper for retryable streaming RPCs.
70
+ This function will be used when initilizing a retry with
71
+ ``AsyncRetry(is_stream=True)``.
72
+
73
+ When ``is_stream=False``, the target is treated as a coroutine,
74
+ and will retry when the coroutine returns an error. When ``is_stream=True``,
75
+ the target will be treated as a callable that retruns an AsyncIterable. Instead
76
+ of just wrapping the initial call in retry logic, the entire iterable is
77
+ wrapped, with each yield passing through the retryable generatpr. If any yield
78
+ in the stream raises a retryable exception, the entire stream will be
79
+ retried.
80
+
81
+ Important Note: when a stream is encounters a retryable error, it will
82
+ silently construct a fresh iterator instance in the background
83
+ and continue yielding (likely duplicate) values as if no error occurred.
84
+ This is the most general way to retry a stream, but it often is not the
85
+ desired behavior. Example: iter([1, 2, 1/0]) -> [1, 2, 1, 2, ...]
86
+
87
+ There are two ways to build more advanced retry logic for streams:
88
+
89
+ 1. Wrap the target
90
+ Use a ``target`` that maintains state between retries, and creates a
91
+ different generator on each retry call. For example, you can wrap a
92
+ grpc call in a function that modifies the request based on what has
93
+ already been returned:
94
+
95
+ ```
96
+ async def attempt_with_modified_request(target, request, seen_items=[]):
97
+ # remove seen items from request on each attempt
98
+ new_request = modify_request(request, seen_items)
99
+ new_generator = await target(new_request)
100
+ async for item in new_generator:
101
+ yield item
102
+ seen_items.append(item)
103
+
104
+ retry_wrapped = AsyncRetry(is_stream=True)(attempt_with_modified_request, target, request, [])
105
+ ```
106
+
107
+ 2. Wrap the retry generator
108
+ Alternatively, you can wrap the retryable generator itself before
109
+ passing it to the end-user to add a filter on the stream. For
110
+ example, you can keep track of the items that were successfully yielded
111
+ in previous retry attempts, and only yield new items when the
112
+ new attempt surpasses the previous ones:
113
+
114
+ ``
115
+ async def retryable_with_filter(target):
116
+ stream_idx = 0
117
+ # reset stream_idx when the stream is retried
118
+ def on_error(e):
119
+ nonlocal stream_idx
120
+ stream_idx = 0
121
+ # build retryable
122
+ retryable_gen = AsyncRetry(is_stream=True, on_error=on_error, ...)(target)
123
+ # keep track of what has been yielded out of filter
124
+ yielded_items = []
125
+ async for item in retryable_gen:
126
+ if stream_idx >= len(yielded_items):
127
+ yield item
128
+ yielded_items.append(item)
129
+ elif item != previous_stream[stream_idx]:
130
+ raise ValueError("Stream differs from last attempt")"
131
+ stream_idx += 1
132
+
133
+ filter_retry_wrapped = retryable_with_filter(target)
134
+ ```
135
+ """
62
136
subgenerator = None
63
137
64
138
timeout = kwargs .get ("deadline" , timeout )
0 commit comments