Skip to content

Commit 7515c07

Browse files
plamutsduskis
authored andcommitted
Core: Mitigate busy reopen loop in ResumableBidiRpc consuming 100% CPU (#8193)
* Add bidi._Throttle helper class * Add optional reopen throttling to ResumableBidiRpc * Enable Bidi reopen throttling in SPM * Change bidi._Throttle signature The commit renames the entry_cap parameter to access_limit, and changes the type of the time_window argument from float to timedelta.
1 parent 2675514 commit 7515c07

File tree

2 files changed

+191
-3
lines changed

2 files changed

+191
-3
lines changed

google/api_core/bidi.py

Lines changed: 93 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,11 @@
1414

1515
"""Bi-directional streaming RPC helpers."""
1616

17+
import collections
18+
import datetime
1719
import logging
1820
import threading
21+
import time
1922

2023
from six.moves import queue
2124

@@ -134,6 +137,73 @@ def __iter__(self):
134137
yield item
135138

136139

140+
class _Throttle(object):
141+
"""A context manager limiting the total entries in a sliding time window.
142+
143+
If more than ``access_limit`` attempts are made to enter the context manager
144+
instance in the last ``time window`` interval, the exceeding requests block
145+
until enough time elapses.
146+
147+
The context manager instances are thread-safe and can be shared between
148+
multiple threads. If multiple requests are blocked and waiting to enter,
149+
the exact order in which they are allowed to proceed is not determined.
150+
151+
Example::
152+
153+
max_three_per_second = _Throttle(
154+
access_limit=3, time_window=datetime.timedelta(seconds=1)
155+
)
156+
157+
for i in range(5):
158+
with max_three_per_second as time_waited:
159+
print("{}: Waited {} seconds to enter".format(i, time_waited))
160+
161+
Args:
162+
access_limit (int): the maximum number of entries allowed in the time window
163+
time_window (datetime.timedelta): the width of the sliding time window
164+
"""
165+
166+
def __init__(self, access_limit, time_window):
167+
if access_limit < 1:
168+
raise ValueError("access_limit argument must be positive")
169+
170+
if time_window <= datetime.timedelta(0):
171+
raise ValueError("time_window argument must be a positive timedelta")
172+
173+
self._time_window = time_window
174+
self._access_limit = access_limit
175+
self._past_entries = collections.deque(maxlen=access_limit) # least recent first
176+
self._entry_lock = threading.Lock()
177+
178+
def __enter__(self):
179+
with self._entry_lock:
180+
cutoff_time = datetime.datetime.now() - self._time_window
181+
182+
# drop the entries that are too old, as they are no longer relevant
183+
while self._past_entries and self._past_entries[0] < cutoff_time:
184+
self._past_entries.popleft()
185+
186+
if len(self._past_entries) < self._access_limit:
187+
self._past_entries.append(datetime.datetime.now())
188+
return 0.0 # no waiting was needed
189+
190+
to_wait = (self._past_entries[0] - cutoff_time).total_seconds()
191+
time.sleep(to_wait)
192+
193+
self._past_entries.append(datetime.datetime.now())
194+
return to_wait
195+
196+
def __exit__(self, *_):
197+
pass
198+
199+
def __repr__(self):
200+
return "{}(access_limit={}, time_window={})".format(
201+
self.__class__.__name__,
202+
self._access_limit,
203+
repr(self._time_window),
204+
)
205+
206+
137207
class BidiRpc(object):
138208
"""A helper for consuming a bi-directional streaming RPC.
139209
@@ -323,15 +393,31 @@ def should_recover(exc):
323393
whenever an error is encountered on the stream.
324394
metadata Sequence[Tuple(str, str)]: RPC metadata to include in
325395
the request.
396+
throttle_reopen (bool): If ``True``, throttling will be applied to
397+
stream reopen calls. Defaults to ``False``.
326398
"""
327399

328-
def __init__(self, start_rpc, should_recover, initial_request=None, metadata=None):
400+
def __init__(
401+
self,
402+
start_rpc,
403+
should_recover,
404+
initial_request=None,
405+
metadata=None,
406+
throttle_reopen=False,
407+
):
329408
super(ResumableBidiRpc, self).__init__(start_rpc, initial_request, metadata)
330409
self._should_recover = should_recover
331410
self._operational_lock = threading.RLock()
332411
self._finalized = False
333412
self._finalize_lock = threading.Lock()
334413

414+
if throttle_reopen:
415+
self._reopen_throttle = _Throttle(
416+
access_limit=5, time_window=datetime.timedelta(seconds=10),
417+
)
418+
else:
419+
self._reopen_throttle = None
420+
335421
def _finalize(self, result):
336422
with self._finalize_lock:
337423
if self._finalized:
@@ -374,7 +460,11 @@ def _reopen(self):
374460
# retryable error.
375461

376462
try:
377-
self.open()
463+
if self._reopen_throttle:
464+
with self._reopen_throttle:
465+
self.open()
466+
else:
467+
self.open()
378468
# If re-opening or re-calling the method fails for any reason,
379469
# consider it a terminal error and finalize the stream.
380470
except Exception as exc:
@@ -573,7 +663,7 @@ def start(self):
573663
thread = threading.Thread(
574664
name=_BIDIRECTIONAL_CONSUMER_NAME,
575665
target=self._thread_main,
576-
args=(ready,)
666+
args=(ready,),
577667
)
578668
thread.daemon = True
579669
thread.start()

tests/unit/test_bidi.py

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import datetime
1516
import logging
1617
import threading
1718

@@ -116,6 +117,87 @@ def test_exit_with_stop(self):
116117
assert items == []
117118

118119

120+
class Test_Throttle(object):
121+
def test_repr(self):
122+
delta = datetime.timedelta(seconds=4.5)
123+
instance = bidi._Throttle(access_limit=42, time_window=delta)
124+
assert repr(instance) == \
125+
"_Throttle(access_limit=42, time_window={})".format(repr(delta))
126+
127+
def test_raises_error_on_invalid_init_arguments(self):
128+
with pytest.raises(ValueError) as exc_info:
129+
bidi._Throttle(
130+
access_limit=10, time_window=datetime.timedelta(seconds=0.0)
131+
)
132+
assert "time_window" in str(exc_info.value)
133+
assert "must be a positive timedelta" in str(exc_info.value)
134+
135+
with pytest.raises(ValueError) as exc_info:
136+
bidi._Throttle(
137+
access_limit=0, time_window=datetime.timedelta(seconds=10)
138+
)
139+
assert "access_limit" in str(exc_info.value)
140+
assert "must be positive" in str(exc_info.value)
141+
142+
def test_does_not_delay_entry_attempts_under_threshold(self):
143+
throttle = bidi._Throttle(
144+
access_limit=3, time_window=datetime.timedelta(seconds=1)
145+
)
146+
entries = []
147+
148+
for _ in range(3):
149+
with throttle as time_waited:
150+
entry_info = {
151+
"entered_at": datetime.datetime.now(),
152+
"reported_wait": time_waited,
153+
}
154+
entries.append(entry_info)
155+
156+
# check the reported wait times ...
157+
assert all(entry["reported_wait"] == 0.0 for entry in entries)
158+
159+
# .. and the actual wait times
160+
delta = entries[1]["entered_at"] - entries[0]["entered_at"]
161+
assert delta.total_seconds() < 0.1
162+
delta = entries[2]["entered_at"] - entries[1]["entered_at"]
163+
assert delta.total_seconds() < 0.1
164+
165+
def test_delays_entry_attempts_above_threshold(self):
166+
throttle = bidi._Throttle(
167+
access_limit=3, time_window=datetime.timedelta(seconds=1)
168+
)
169+
entries = []
170+
171+
for _ in range(6):
172+
with throttle as time_waited:
173+
entry_info = {
174+
"entered_at": datetime.datetime.now(),
175+
"reported_wait": time_waited,
176+
}
177+
entries.append(entry_info)
178+
179+
# For each group of 4 consecutive entries the time difference between
180+
# the first and the last entry must have been greater than time_window,
181+
# because a maximum of 3 are allowed in each time_window.
182+
for i, entry in enumerate(entries[3:], start=3):
183+
first_entry = entries[i - 3]
184+
delta = entry["entered_at"] - first_entry["entered_at"]
185+
assert delta.total_seconds() > 1.0
186+
187+
# check the reported wait times
188+
# (NOTE: not using assert all(...), b/c the coverage check would complain)
189+
for i, entry in enumerate(entries):
190+
if i != 3:
191+
assert entry["reported_wait"] == 0.0
192+
193+
# The delayed entry is expected to have been delayed for a significant
194+
# chunk of the full second, and the actual and reported delay times
195+
# should reflect that.
196+
assert entries[3]["reported_wait"] > 0.7
197+
delta = entries[3]["entered_at"] - entries[2]["entered_at"]
198+
assert delta.total_seconds() > 0.7
199+
200+
119201
class _CallAndFuture(grpc.Call, grpc.Future):
120202
pass
121203

@@ -442,6 +524,22 @@ def test_reopen_failure_on_rpc_restart(self):
442524
assert bidi_rpc.is_active is False
443525
callback.assert_called_once_with(error2)
444526

527+
def test_using_throttle_on_reopen_requests(self):
528+
call = CallStub([])
529+
start_rpc = mock.create_autospec(
530+
grpc.StreamStreamMultiCallable, instance=True, return_value=call
531+
)
532+
should_recover = mock.Mock(spec=["__call__"], return_value=True)
533+
bidi_rpc = bidi.ResumableBidiRpc(
534+
start_rpc, should_recover, throttle_reopen=True
535+
)
536+
537+
patcher = mock.patch.object(bidi_rpc._reopen_throttle.__class__, "__enter__")
538+
with patcher as mock_enter:
539+
bidi_rpc._reopen()
540+
541+
mock_enter.assert_called_once()
542+
445543
def test_send_not_open(self):
446544
bidi_rpc = bidi.ResumableBidiRpc(None, lambda _: False)
447545

0 commit comments

Comments
 (0)