Skip to content

Commit acd6546

Browse files
committed
remove wait_for in async streaming for perf reasons
1 parent c4049f5 commit acd6546

File tree

4 files changed

+82
-47
lines changed

4 files changed

+82
-47
lines changed

google/api_core/retry_streaming.py

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,24 +97,39 @@ def _handle_exception(self, exc) -> None:
9797
next_attempt = datetime_helpers.utcnow() + datetime.timedelta(
9898
seconds=next_sleep
9999
)
100-
if self.deadline < next_attempt:
101-
raise exceptions.RetryError(
102-
f"Deadline of {self.timeout:.1f} seconds exceeded", exc
103-
) from exc
100+
self._check_timeout(next_attempt, exc)
104101
# sleep before retrying
105102
_LOGGER.debug(
106103
"Retrying due to {}, sleeping {:.1f}s ...".format(exc, next_sleep)
107104
)
108105
time.sleep(next_sleep)
109106
self.active_target = self.target_fn().__iter__()
110107

108+
def _check_timeout(self, current_time:float, source_exception: Optional[Exception] = None) -> None:
109+
"""
110+
Helper function to check if the timeout has been exceeded, and raise a RetryError if so.
111+
112+
Args:
113+
- current_time: the timestamp to check against the deadline
114+
- source_exception: the exception that triggered the timeout check, if any
115+
Raises:
116+
- RetryError if the deadline has been exceeded
117+
"""
118+
if self.deadline is not None and self.deadline < current_time:
119+
raise exceptions.RetryError(
120+
"Timeout of {:.1f}s exceeded".format(self.timeout),
121+
source_exception,
122+
) from source_exception
123+
111124
def __next__(self) -> T:
112125
"""
113126
Implement the iterator protocol.
114127
115128
Returns:
116129
- the next value of the active_target iterator
117130
"""
131+
# check for expired timeouts before attempting to iterate
132+
self._check_timeout(datetime_helpers.utcnow())
118133
try:
119134
return next(self.active_target)
120135
except Exception as exc:
@@ -152,6 +167,8 @@ def send(self, *args, **kwargs) -> T:
152167
Raises:
153168
- AttributeError if the active_target does not have a send() method
154169
"""
170+
# check for expired timeouts before attempting to iterate
171+
self._check_timeout(datetime_helpers.utcnow())
155172
if getattr(self.active_target, "send", None):
156173
casted_target = cast(Generator, self.active_target)
157174
try:

google/api_core/retry_streaming_async.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -170,10 +170,9 @@ async def _iteration_helper(self, iteration_routine: Awaitable) -> T:
170170
# start the timer for the current operation
171171
start_timestamp = datetime_helpers.utcnow()
172172
# grab the next value from the active_target
173-
next_val_routine = asyncio.wait_for(
174-
iteration_routine, self.remaining_timeout_budget
175-
)
176-
next_val = await next_val_routine
173+
# Note: interrupting with asyncio.wait_for is expensive,
174+
# so we only check for timeouts at the start of each iteration
175+
next_val = await iteration_routine
177176
# subtract the time spent waiting for the next value from the
178177
# remaining timeout budget
179178
self._subtract_time_from_budget(start_timestamp)

tests/asyncio/test_retry_async.py

Lines changed: 21 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -540,44 +540,6 @@ def increase_time(sleep_delay):
540540
assert last_wait == 4
541541
assert total_wait == 7
542542

543-
@pytest.mark.asyncio
544-
async def test___call___generator_timeout_cancellations(self):
545-
"""
546-
Tests that a retry-decorated generator will throw a RetryError
547-
after using its time budget
548-
"""
549-
on_error = mock.Mock(return_value=None)
550-
retry_ = retry_async.AsyncRetry(
551-
predicate=retry_async.if_exception_type(ValueError),
552-
deadline=0.2,
553-
is_stream=True,
554-
)
555-
utcnow = datetime.datetime.utcnow()
556-
utcnow_patcher = mock.patch(
557-
"google.api_core.datetime_helpers.utcnow", return_value=utcnow
558-
)
559-
# ensure generator times out when awaiting past deadline
560-
with pytest.raises(exceptions.RetryError):
561-
infinite_gen = retry_(self._generator_mock, on_error)(sleep_time=60)
562-
await infinite_gen.__anext__()
563-
# ensure time between yields isn't counted
564-
with utcnow_patcher as patched_utcnow:
565-
generator = retry_(self._generator_mock)(sleep_time=0.05)
566-
assert await generator.__anext__() == 0
567-
patched_utcnow.return_value += datetime.timedelta(20)
568-
assert await generator.__anext__() == 1
569-
# ensure timeout budget is tracked
570-
generator = retry_(self._generator_mock)(sleep_time=0.07)
571-
assert await generator.__anext__() == 0
572-
assert await generator.__anext__() == 1
573-
with pytest.raises(exceptions.RetryError) as exc:
574-
await generator.__anext__()
575-
assert "Timeout of 0.2s exceeded" in str(exc.value)
576-
# subsequent calls should also return a RetryError
577-
with pytest.raises(exceptions.RetryError) as exc:
578-
await generator.__anext__()
579-
assert "Timeout of 0.2s exceeded" in str(exc.value)
580-
581543
@pytest.mark.asyncio
582544
async def test___call___generator_await_cancel_retryable(self):
583545
"""
@@ -794,3 +756,24 @@ async def __anext__(self):
794756
assert await retryable.__anext__() == 3
795757
with pytest.raises(StopAsyncIteration):
796758
await retryable.__anext__()
759+
760+
@pytest.mark.asyncio
761+
async def test_iterate_stream_after_deadline(self):
762+
"""
763+
Streaming retries should raise RetryError when calling next or send after deadline has passed
764+
"""
765+
retry_ = retry_async.AsyncRetry(is_stream=True, deadline=0.01)
766+
decorated = retry_(self._generator_mock)
767+
generator = decorated(10)
768+
starting_time_budget = generator.remaining_timeout_budget
769+
assert starting_time_budget == 0.01
770+
await generator.__anext__()
771+
# ensure budget is used on each call
772+
assert generator.remaining_timeout_budget < starting_time_budget
773+
# simulate using up budget
774+
generator.remaining_timeout_budget = 0
775+
with pytest.raises(exceptions.RetryError):
776+
await generator.__anext__()
777+
with pytest.raises(exceptions.RetryError):
778+
await generator.asend("test")
779+

tests/unit/test_retry.py

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ def test_with_delay_partial_options(self):
304304
assert retry_ is not new_retry
305305
assert new_retry._initial == 1
306306
assert new_retry._maximum == 4
307-
assert new_retry._multiplier == 3
307+
assert n ew_retry._multiplier == 3
308308

309309
new_retry = retry_.with_delay(multiplier=4)
310310
assert retry_ is not new_retry
@@ -768,3 +768,39 @@ def test___call___with_is_stream(self, sleep):
768768
gen = gen_retry_(wrapped)()
769769
unpacked = [next(gen) for i in range(10)]
770770
assert unpacked == [0, 1, 2, 3, 4, 5, 0, 1, 2, 3]
771+
772+
def test_iterate_stream_after_deadline(self):
773+
"""
774+
Streaming retries should raise RetryError when calling next after deadline has passed
775+
"""
776+
from time import sleep
777+
retry_ = retry.Retry(
778+
predicate=retry.if_exception_type(ValueError),
779+
is_stream=True,
780+
deadline=0.01,
781+
)
782+
decorated = retry_(self._generator_mock)
783+
generator = decorated(10)
784+
next(generator)
785+
sleep(0.02)
786+
with pytest.raises(exceptions.RetryError):
787+
next(generator)
788+
789+
def test_iterate_stream_send_after_deadline(self):
790+
"""
791+
Streaming retries should raise RetryError when calling send after deadline has passed
792+
"""
793+
from time import sleep
794+
retry_ = retry.Retry(
795+
predicate=retry.if_exception_type(ValueError),
796+
is_stream=True,
797+
deadline=0.01,
798+
)
799+
decorated = retry_(self._generator_mock)
800+
generator = decorated(10)
801+
next(generator)
802+
generator.send("test")
803+
sleep(0.02)
804+
with pytest.raises(exceptions.RetryError):
805+
generator.send("test")
806+
 

0 commit comments

Comments
 (0)