From fa98f3beafafe2d4304ebb10f61767dae3e94a15 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Mon, 11 Sep 2023 13:37:41 -0700 Subject: [PATCH 01/33] updated showcase version for streaming retry tests --- noxfile.py | 3 ++- tests/system/conftest.py | 4 ++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/noxfile.py b/noxfile.py index afe77cfd9a..2accee78a6 100644 --- a/noxfile.py +++ b/noxfile.py @@ -29,7 +29,7 @@ nox.options.error_on_missing_interpreters = True -showcase_version = os.environ.get("SHOWCASE_VERSION", "0.25.0") +showcase_version = os.environ.get("SHOWCASE_VERSION", "0.28.4") ADS_TEMPLATES = path.join(path.dirname(__file__), "gapic", "ads-templates") @@ -225,6 +225,7 @@ def showcase_library( f"google/showcase/v1beta1/echo.proto", f"google/showcase/v1beta1/identity.proto", f"google/showcase/v1beta1/messaging.proto", + f"google/showcase/v1beta1/sequence.proto", ) session.run( *cmd_tup, external=True, diff --git a/tests/system/conftest.py b/tests/system/conftest.py index 15b4f4c300..44cca1e8ab 100644 --- a/tests/system/conftest.py +++ b/tests/system/conftest.py @@ -21,6 +21,7 @@ from google.api_core.client_options import ClientOptions # type: ignore from google.auth import credentials from google.showcase import EchoClient +from google.showcase import SequenceServiceClient from google.showcase import IdentityClient from google.showcase import MessagingClient @@ -133,6 +134,9 @@ def use_mtls(request): def echo(use_mtls, request): return construct_client(EchoClient, use_mtls, transport_name=request.param) +@pytest.fixture(params=["grpc", "rest"]) +def sequence(use_mtls, request): + return construct_client(SequenceServiceClient, use_mtls, transport_name=request.param) @pytest.fixture(params=["grpc", "rest"]) def identity(use_mtls, request): From c6e28cdce52460c8711aff673ad747dbf7659d6c Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Mon, 11 Sep 2023 14:35:17 -0700 Subject: [PATCH 02/33] added successful test --- tests/system/test_retry_streaming.py | 39 ++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) create mode 100644 tests/system/test_retry_streaming.py diff --git a/tests/system/test_retry_streaming.py b/tests/system/test_retry_streaming.py new file mode 100644 index 0000000000..942d0ae754 --- /dev/null +++ b/tests/system/test_retry_streaming.py @@ -0,0 +1,39 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from google.rpc.status_pb2 import Status +from datetime import timedelta + + +def test_streaiming_retry_success(sequence): + """ + Test a stream with a sigle success response + """ + content = ['hello', 'world'] + seq = sequence.create_streaming_sequence( + streaming_sequence={ + 'name': 'test_streaming_retry_success', + 'content': ' '.join(content), + # single response with entire stream content + 'responses': [{'status': Status(code=0), 'response_index': len(content)}], + } + ) + it = sequence.attempt_streaming_sequence(name=seq.name) + results = [pb.content for pb in it] + assert results == content + # verify streaming report + report = sequence.get_streaming_sequence_report(name=f'{seq.name}/streamingSequenceReport') + assert len(report.attempts) == 1 + assert report.attempts[0].status == Status(code=0) + From 8398997b336322c697bc6b85d1c1e645a082f936 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Mon, 11 Sep 2023 16:08:54 -0700 Subject: [PATCH 03/33] added test for non-retried exception --- tests/system/test_retry_streaming.py | 35 +++++++++++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/tests/system/test_retry_streaming.py b/tests/system/test_retry_streaming.py index 942d0ae754..2d33f92dd3 100644 --- a/tests/system/test_retry_streaming.py +++ b/tests/system/test_retry_streaming.py @@ -12,11 +12,21 @@ # See the License for the specific language governing permissions and # limitations under the License. +import pytest from google.rpc.status_pb2 import Status from datetime import timedelta +from google.api_core import retry as retries +from google.api_core import exceptions as core_exceptions -def test_streaiming_retry_success(sequence): +def _code_from_exc(exc): + """ + return the grpc code from an exception + """ + return exc.grpc_status_code.value[0] + + +def test_streaming_retry_success(sequence): """ Test a stream with a sigle success response """ @@ -37,3 +47,26 @@ def test_streaiming_retry_success(sequence): assert len(report.attempts) == 1 assert report.attempts[0].status == Status(code=0) + +def test_streaming_non_retryable_error(sequence): + """ + Test a retryable stream failing with non-retryable error + """ + retry = retries.Retry(predicate=retries.if_exception_type(), is_stream=True) + content = ['hello', 'world'] + error = Status(code=_code_from_exc(core_exceptions.ServiceUnavailable), message='expected error') + seq = sequence.create_streaming_sequence( + streaming_sequence={ + 'name': 'test_streaming_retry_success', + 'content': ' '.join(content), + 'responses': [{'status': error, 'response_index': 0}], + } + ) + with pytest.raises(core_exceptions.ServiceUnavailable): + it = sequence.attempt_streaming_sequence(name=seq.name, retry=retry) + next(it) + # verify streaming report + report = sequence.get_streaming_sequence_report(name=f'{seq.name}/streamingSequenceReport') + assert len(report.attempts) == 1 + assert report.attempts[0].status == error + From 193e506dc5d3ef64a5e8c90c0f1f7cd58345e0e0 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Mon, 11 Sep 2023 16:21:34 -0700 Subject: [PATCH 04/33] added test with transient errors --- tests/system/test_retry_streaming.py | 33 ++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/tests/system/test_retry_streaming.py b/tests/system/test_retry_streaming.py index 2d33f92dd3..2d643565ff 100644 --- a/tests/system/test_retry_streaming.py +++ b/tests/system/test_retry_streaming.py @@ -70,3 +70,36 @@ def test_streaming_non_retryable_error(sequence): assert len(report.attempts) == 1 assert report.attempts[0].status == error +def test_streaming_retryable_eventual_success(sequence): + """ + Server returns a retryable error a number of times before success. + Retryable errors should not be presented to the end user. + """ + retry = retries.Retry( + predicate=retries.if_exception_type(core_exceptions.ServiceUnavailable), + initial=0, + maximum=0, + timeout=1, + is_stream=True + ) + content = ['hello', 'world'] + error = Status(code=_code_from_exc(core_exceptions.ServiceUnavailable), message='transient error') + responses = [{'status': error, 'response_index': 0} for _ in range(3)] + [{'status': Status(code=0), 'response_index': len(content)}] + seq = sequence.create_streaming_sequence( + streaming_sequence={ + 'name': 'test_streaming_retry_success', + 'content': ' '.join(content), + 'responses': responses, + } + ) + it = sequence.attempt_streaming_sequence(name=seq.name, retry=retry) + results = [pb.content for pb in it] + assert results == content + # verify streaming report + report = sequence.get_streaming_sequence_report(name=f'{seq.name}/streamingSequenceReport') + assert len(report.attempts) == 4 + assert report.attempts[0].status == error + assert report.attempts[1].status == error + assert report.attempts[2].status == error + assert report.attempts[3].status == Status(code=0) + From f0d55dafc5c3c0cac285762ab49fe2102338b20d Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Mon, 11 Sep 2023 16:49:37 -0700 Subject: [PATCH 05/33] added test for retryable with partial data --- tests/system/test_retry_streaming.py | 37 +++++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/tests/system/test_retry_streaming.py b/tests/system/test_retry_streaming.py index 2d643565ff..8be00ab2f7 100644 --- a/tests/system/test_retry_streaming.py +++ b/tests/system/test_retry_streaming.py @@ -70,7 +70,7 @@ def test_streaming_non_retryable_error(sequence): assert len(report.attempts) == 1 assert report.attempts[0].status == error -def test_streaming_retryable_eventual_success(sequence): +def test_streaming_transient_retryable(sequence): """ Server returns a retryable error a number of times before success. Retryable errors should not be presented to the end user. @@ -103,3 +103,38 @@ def test_streaming_retryable_eventual_success(sequence): assert report.attempts[2].status == error assert report.attempts[3].status == Status(code=0) + +def test_streaming_transient_retryable_partial_data(sequence): + """ + Server stream yields some data before failing with a retryable error a number of times before success. + Wrapped stream should contain data from all attempts + """ + retry = retries.Retry( + predicate=retries.if_exception_type(core_exceptions.ServiceUnavailable), + initial=0, + maximum=0, + is_stream=True + ) + content = ['hello', 'world'] + error = Status(code=_code_from_exc(core_exceptions.ServiceUnavailable), message='transient error') + transient_error_list = [{'status': error, 'response_index': 1}] * 3 + responses = transient_error_list + [{'status': Status(code=0), 'response_index': len(content)}] + seq = sequence.create_streaming_sequence( + streaming_sequence={ + 'name': 'test_streaming_retry_success', + 'content': ' '.join(content), + 'responses': responses, + } + ) + it = sequence.attempt_streaming_sequence(name=seq.name, retry=retry) + results = [pb.content for pb in it] + assert results == ['hello'] * len(transient_error_list) + ['hello', 'world'] + # verify streaming report + report = sequence.get_streaming_sequence_report(name=f'{seq.name}/streamingSequenceReport') + assert len(report.attempts) == 4 + assert report.attempts[0].status == error + assert report.attempts[1].status == error + assert report.attempts[2].status == error + assert report.attempts[3].status == Status(code=0) + + From d562aeb3123d3e6b8e49387f3ed556b75eda0d7e Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Mon, 11 Sep 2023 17:01:13 -0700 Subject: [PATCH 06/33] added retry test with eventual timeout --- tests/system/test_retry_streaming.py | 34 ++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/tests/system/test_retry_streaming.py b/tests/system/test_retry_streaming.py index 8be00ab2f7..0b1ab530bb 100644 --- a/tests/system/test_retry_streaming.py +++ b/tests/system/test_retry_streaming.py @@ -138,3 +138,37 @@ def test_streaming_transient_retryable_partial_data(sequence): assert report.attempts[3].status == Status(code=0) +def test_streaming_retryable_eventual_timeout(sequence): + """ + Server returns a retryable error a number of times before reaching timeout. + Should raise a retry error. + """ + retry = retries.Retry( + predicate=retries.if_exception_type(core_exceptions.ServiceUnavailable), + initial=0, + maximum=0, + timeout=0.35, + is_stream=True + ) + content = ['hello', 'world'] + error = Status(code=_code_from_exc(core_exceptions.ServiceUnavailable), message='transient error') + transient_error_list = [{'status': error, 'response_index': 1, 'delay': timedelta(seconds=0.15)}] * 3 + responses = transient_error_list + [{'status': Status(code=0), 'response_index': len(content)}] + seq = sequence.create_streaming_sequence( + streaming_sequence={ + 'name': 'test_streaming_retry_success', + 'content': ' '.join(content), + 'responses': responses, + } + ) + with pytest.raises(core_exceptions.RetryError) as exc_info: + it = sequence.attempt_streaming_sequence(name=seq.name, retry=retry) + [pb.content for pb in it] + cause = exc_info.value.__cause__ + assert isinstance(cause, core_exceptions.ServiceUnavailable) + # verify streaming report + report = sequence.get_streaming_sequence_report(name=f'{seq.name}/streamingSequenceReport') + assert len(report.attempts) == 3 + assert report.attempts[0].status == error + assert report.attempts[1].status == error + assert report.attempts[2].status == error From e25866ec38cf28d0eb553f2f7cb4dc49e4c09a3c Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Tue, 12 Sep 2023 10:54:58 -0700 Subject: [PATCH 07/33] added tests for on_error and backoff --- tests/system/test_retry_streaming.py | 86 ++++++++++++++++++++++++++-- 1 file changed, 80 insertions(+), 6 deletions(-) diff --git a/tests/system/test_retry_streaming.py b/tests/system/test_retry_streaming.py index 0b1ab530bb..e54df7afbe 100644 --- a/tests/system/test_retry_streaming.py +++ b/tests/system/test_retry_streaming.py @@ -13,6 +13,7 @@ # limitations under the License. import pytest +import mock from google.rpc.status_pb2 import Status from datetime import timedelta from google.api_core import retry as retries @@ -33,7 +34,7 @@ def test_streaming_retry_success(sequence): content = ['hello', 'world'] seq = sequence.create_streaming_sequence( streaming_sequence={ - 'name': 'test_streaming_retry_success', + 'name': __name__, 'content': ' '.join(content), # single response with entire stream content 'responses': [{'status': Status(code=0), 'response_index': len(content)}], @@ -57,7 +58,7 @@ def test_streaming_non_retryable_error(sequence): error = Status(code=_code_from_exc(core_exceptions.ServiceUnavailable), message='expected error') seq = sequence.create_streaming_sequence( streaming_sequence={ - 'name': 'test_streaming_retry_success', + 'name': __name__, 'content': ' '.join(content), 'responses': [{'status': error, 'response_index': 0}], } @@ -87,7 +88,7 @@ def test_streaming_transient_retryable(sequence): responses = [{'status': error, 'response_index': 0} for _ in range(3)] + [{'status': Status(code=0), 'response_index': len(content)}] seq = sequence.create_streaming_sequence( streaming_sequence={ - 'name': 'test_streaming_retry_success', + 'name': __name__, 'content': ' '.join(content), 'responses': responses, } @@ -121,7 +122,7 @@ def test_streaming_transient_retryable_partial_data(sequence): responses = transient_error_list + [{'status': Status(code=0), 'response_index': len(content)}] seq = sequence.create_streaming_sequence( streaming_sequence={ - 'name': 'test_streaming_retry_success', + 'name': __name__, 'content': ' '.join(content), 'responses': responses, } @@ -152,11 +153,11 @@ def test_streaming_retryable_eventual_timeout(sequence): ) content = ['hello', 'world'] error = Status(code=_code_from_exc(core_exceptions.ServiceUnavailable), message='transient error') - transient_error_list = [{'status': error, 'response_index': 1, 'delay': timedelta(seconds=0.15)}] * 3 + transient_error_list = [{'status': error, 'response_index': 1, 'delay': timedelta(seconds=0.15)}] * 10 responses = transient_error_list + [{'status': Status(code=0), 'response_index': len(content)}] seq = sequence.create_streaming_sequence( streaming_sequence={ - 'name': 'test_streaming_retry_success', + 'name': __name__, 'content': ' '.join(content), 'responses': responses, } @@ -172,3 +173,76 @@ def test_streaming_retryable_eventual_timeout(sequence): assert report.attempts[0].status == error assert report.attempts[1].status == error assert report.attempts[2].status == error + +def test_streaming_retry_on_error(sequence): + """ + on_error should be called for all retryable errors as they are encountered + """ + encountered_excs = [] + def on_error(exc): + encountered_excs.append(exc) + + retry = retries.Retry( + predicate=retries.if_exception_type(core_exceptions.ServiceUnavailable, core_exceptions.GatewayTimeout), + initial=0, + maximum=0, + on_error=on_error, + is_stream=True + ) + content = ['hello', 'world'] + errors = [core_exceptions.ServiceUnavailable, core_exceptions.DeadlineExceeded, core_exceptions.NotFound] + responses = [{'status': Status(code=_code_from_exc(exc))} for exc in errors] + seq = sequence.create_streaming_sequence( + streaming_sequence={ + 'name': __name__, + 'content': ' '.join(content), + 'responses': responses, + } + ) + with pytest.raises(core_exceptions.NotFound): + it = sequence.attempt_streaming_sequence(name=seq.name, retry=retry) + [pb.content for pb in it] + # on_error should have been called on the first two errors, but not the terminal one + assert len(encountered_excs) == 2 + assert isinstance(encountered_excs[0], core_exceptions.ServiceUnavailable) + # rest raises superclass GatewayTimeout in place of DeadlineExceeded + assert isinstance(encountered_excs[1], (core_exceptions.DeadlineExceeded, core_exceptions.GatewayTimeout)) + + +@pytest.mark.parametrize('initial,multiplier,maximum,expected', [ + (0.1, 1.0, 0.5, [0.1, 0.1, 0.1]), + (0, 2.0, 0.5, [0, 0]), + (0.1, 2.0, 0.5, [0.1, 0.2, 0.4, 0.5, 0.5]), + (1, 1.5, 5, [1, 1.5, 2.25, 3.375, 5, 5]), +]) +def test_streaming_retry_sleep_generator(sequence, initial, multiplier, maximum, expected): + """ + should be able to pass in sleep generator to control backoff + """ + retry = retries.Retry( + predicate=retries.if_exception_type(core_exceptions.ServiceUnavailable), + initial=initial, + maximum=maximum, + multiplier=multiplier, + is_stream=True + ) + content = ['hello', 'world'] + error = Status(code=_code_from_exc(core_exceptions.ServiceUnavailable), message='transient error') + transient_error_list = [{'status': error}] * len(expected) + responses = transient_error_list + [{'status': Status(code=0), 'response_index': len(content)}] + seq = sequence.create_streaming_sequence( + streaming_sequence={ + 'name': __name__, + 'content': ' '.join(content), + 'responses': responses, + } + ) + with mock.patch("random.uniform") as mock_uniform: + # make sleep generator deterministic + mock_uniform.side_effect = lambda a, b: b + with mock.patch("time.sleep") as mock_sleep: + it = sequence.attempt_streaming_sequence(name=seq.name, retry=retry) + [pb.content for pb in it] + assert mock_sleep.call_count == len(expected) + # ensure that sleep times match expected + assert mock_sleep.call_args_list == [mock.call(sleep_time) for sleep_time in expected] From 5ed69f580321fff9a400530e0adaaebe044347e2 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Tue, 12 Sep 2023 11:18:42 -0700 Subject: [PATCH 08/33] added async streaming tests --- tests/system/conftest.py | 11 + tests/system/test_retry_streaming_async.py | 254 +++++++++++++++++++++ 2 files changed, 265 insertions(+) create mode 100644 tests/system/test_retry_streaming_async.py diff --git a/tests/system/conftest.py b/tests/system/conftest.py index 44cca1e8ab..b1b797c0ee 100644 --- a/tests/system/conftest.py +++ b/tests/system/conftest.py @@ -30,6 +30,7 @@ import asyncio from google.showcase import EchoAsyncClient from google.showcase import IdentityAsyncClient + from google.showcase import SequenceServiceAsyncClient _test_event_loop = asyncio.new_event_loop() asyncio.set_event_loop(_test_event_loop) @@ -61,6 +62,16 @@ def async_identity(use_mtls, event_loop): channel_creator=aio.insecure_channel ) + @pytest.fixture + def async_sequence(use_mtls, event_loop): + return construct_client( + SequenceServiceAsyncClient, + use_mtls, + transport_name="grpc_asyncio", + channel_creator=aio.insecure_channel + ) + + dir = os.path.dirname(__file__) with open(os.path.join(dir, "../cert/mtls.crt"), "rb") as fh: diff --git a/tests/system/test_retry_streaming_async.py b/tests/system/test_retry_streaming_async.py new file mode 100644 index 0000000000..68127c9524 --- /dev/null +++ b/tests/system/test_retry_streaming_async.py @@ -0,0 +1,254 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import pytest +import mock +from google.rpc.status_pb2 import Status +from datetime import timedelta +from google.api_core import retry as retries +from google.api_core import retry_async as retries_async +from google.api_core import exceptions as core_exceptions + +from test_retry_streaming import _code_from_exc + + +@pytest.mark.asyncio +async def test_async_streaming_retry_success(async_sequence): + """ + Test a stream with a sigle success response + """ + retry = retries_async.AsyncRetry(predicate=retries.if_exception_type(), is_stream=True) + content = ['hello', 'world'] + seq = await async_sequence.create_streaming_sequence( + streaming_sequence={ + 'name': __name__, + 'content': ' '.join(content), + # single response with entire stream content + 'responses': [{'status': Status(code=0), 'response_index': len(content)}], + } + ) + it = async_sequence.attempt_streaming_sequence(name=seq.name, retry=retry) + results = [pb.content async for pb in it] + assert results == content + # verify streaming report + report = await async_sequence.get_streaming_sequence_report(name=f'{seq.name}/streamingSequenceReport') + assert len(report.attempts) == 1 + assert report.attempts[0].status == Status(code=0) + + +@pytest.mark.asyncio +async def test_async_streaming_non_retryable_error(async_sequence): + """ + Test a retryable stream failing with non-retryable error + """ + retry = retries_async.AsyncRetry(predicate=retries.if_exception_type(), is_stream=True) + content = ['hello', 'world'] + error = Status(code=_code_from_exc(core_exceptions.ServiceUnavailable), message='expected error') + seq = await async_sequence.create_streaming_sequence( + streaming_sequence={ + 'name': __name__, + 'content': ' '.join(content), + 'responses': [{'status': error, 'response_index': 0}], + } + ) + with pytest.raises(core_exceptions.ServiceUnavailable): + it = async_sequence.attempt_streaming_sequence(name=seq.name, retry=retry) + await it.__anext__() + # verify streaming report + report = await async_sequence.get_streaming_sequence_report(name=f'{seq.name}/streamingSequenceReport') + assert len(report.attempts) == 1 + assert report.attempts[0].status == error + +@pytest.mark.asyncio +async def test_async_streaming_transient_retryable(async_sequence): + """ + Server returns a retryable error a number of times before success. + Retryable errors should not be presented to the end user. + """ + retry = retries_async.AsyncRetry( + predicate=retries.if_exception_type(core_exceptions.ServiceUnavailable), + initial=0, + maximum=0, + timeout=1, + is_stream=True + ) + content = ['hello', 'world'] + error = Status(code=_code_from_exc(core_exceptions.ServiceUnavailable), message='transient error') + responses = [{'status': error, 'response_index': 0} for _ in range(3)] + [{'status': Status(code=0), 'response_index': len(content)}] + seq = await async_sequence.create_streaming_sequence( + streaming_sequence={ + 'name': __name__, + 'content': ' '.join(content), + 'responses': responses, + } + ) + it = async_sequence.attempt_streaming_sequence(name=seq.name, retry=retry) + results = [pb.content async for pb in it] + assert results == content + # verify streaming report + report = await async_sequence.get_streaming_sequence_report(name=f'{seq.name}/streamingSequenceReport') + assert len(report.attempts) == 4 + assert report.attempts[0].status == error + assert report.attempts[1].status == error + assert report.attempts[2].status == error + assert report.attempts[3].status == Status(code=0) + + +@pytest.mark.asyncio +async def test_async_streaming_transient_retryable_partial_data(async_sequence): + """ + Server stream yields some data before failing with a retryable error a number of times before success. + Wrapped stream should contain data from all attempts + """ + retry = retries_async.AsyncRetry( + predicate=retries.if_exception_type(core_exceptions.ServiceUnavailable), + initial=0, + maximum=0, + is_stream=True + ) + content = ['hello', 'world'] + error = Status(code=_code_from_exc(core_exceptions.ServiceUnavailable), message='transient error') + transient_error_list = [{'status': error, 'response_index': 1}] * 3 + responses = transient_error_list + [{'status': Status(code=0), 'response_index': len(content)}] + seq = await async_sequence.create_streaming_sequence( + streaming_sequence={ + 'name': __name__, + 'content': ' '.join(content), + 'responses': responses, + } + ) + it = async_sequence.attempt_streaming_sequence(name=seq.name, retry=retry) + results = [pb.content async for pb in it] + assert results == ['hello'] * len(transient_error_list) + ['hello', 'world'] + # verify streaming report + report = await async_sequence.get_streaming_sequence_report(name=f'{seq.name}/streamingSequenceReport') + assert len(report.attempts) == 4 + assert report.attempts[0].status == error + assert report.attempts[1].status == error + assert report.attempts[2].status == error + assert report.attempts[3].status == Status(code=0) + + +@pytest.mark.asyncio +async def test_async_streaming_retryable_eventual_timeout(async_sequence): + """ + Server returns a retryable error a number of times before reaching timeout. + Should raise a retry error. + """ + retry = retries_async.AsyncRetry( + predicate=retries.if_exception_type(core_exceptions.ServiceUnavailable), + initial=0, + maximum=0, + timeout=0.35, + is_stream=True + ) + content = ['hello', 'world'] + error = Status(code=_code_from_exc(core_exceptions.ServiceUnavailable), message='transient error') + transient_error_list = [{'status': error, 'response_index': 1, 'delay': timedelta(seconds=0.15)}] * 10 + responses = transient_error_list + [{'status': Status(code=0), 'response_index': len(content)}] + seq = await async_sequence.create_streaming_sequence( + streaming_sequence={ + 'name': __name__, + 'content': ' '.join(content), + 'responses': responses, + } + ) + with pytest.raises(core_exceptions.RetryError) as exc_info: + it = async_sequence.attempt_streaming_sequence(name=seq.name, retry=retry) + [pb.content async for pb in it] + cause = exc_info.value.__cause__ + assert isinstance(cause, core_exceptions.ServiceUnavailable) + # verify streaming report + report = await async_sequence.get_streaming_sequence_report(name=f'{seq.name}/streamingSequenceReport') + assert len(report.attempts) == 3 + assert report.attempts[0].status == error + assert report.attempts[1].status == error + assert report.attempts[2].status == error + + +@pytest.mark.asyncio +async def test_async_streaming_retry_on_error(async_sequence): + """ + on_error should be called for all retryable errors as they are encountered + """ + encountered_excs = [] + def on_error(exc): + encountered_excs.append(exc) + + retry = retries_async.AsyncRetry( + predicate=retries.if_exception_type(core_exceptions.ServiceUnavailable, core_exceptions.GatewayTimeout), + initial=0, + maximum=0, + on_error=on_error, + is_stream=True + ) + content = ['hello', 'world'] + errors = [core_exceptions.ServiceUnavailable, core_exceptions.DeadlineExceeded, core_exceptions.NotFound] + responses = [{'status': Status(code=_code_from_exc(exc))} for exc in errors] + seq = await async_sequence.create_streaming_sequence( + streaming_sequence={ + 'name': __name__, + 'content': ' '.join(content), + 'responses': responses, + } + ) + with pytest.raises(core_exceptions.NotFound): + it = async_sequence.attempt_streaming_sequence(name=seq.name, retry=retry) + [pb.content async for pb in it] + # on_error should have been called on the first two errors, but not the terminal one + assert len(encountered_excs) == 2 + assert isinstance(encountered_excs[0], core_exceptions.ServiceUnavailable) + # rest raises superclass GatewayTimeout in place of DeadlineExceeded + assert isinstance(encountered_excs[1], (core_exceptions.DeadlineExceeded, core_exceptions.GatewayTimeout)) + + +@pytest.mark.parametrize('initial,multiplier,maximum,expected', [ + (0.1, 1.0, 0.5, [0.1, 0.1, 0.1]), + (0, 2.0, 0.5, [0, 0]), + (0.1, 2.0, 0.5, [0.1, 0.2, 0.4, 0.5, 0.5]), + (1, 1.5, 5, [1, 1.5, 2.25, 3.375, 5, 5]), +]) +@pytest.mark.asyncio +async def test_async_streaming_retry_sleep_generator(async_sequence, initial, multiplier, maximum, expected): + """ + should be able to pass in sleep generator to control backoff + """ + retry = retries_async.AsyncRetry( + predicate=retries.if_exception_type(core_exceptions.ServiceUnavailable), + initial=initial, + maximum=maximum, + multiplier=multiplier, + is_stream=True + ) + content = ['hello', 'world'] + error = Status(code=_code_from_exc(core_exceptions.ServiceUnavailable), message='transient error') + transient_error_list = [{'status': error}] * len(expected) + responses = transient_error_list + [{'status': Status(code=0), 'response_index': len(content)}] + seq = await async_sequence.create_streaming_sequence( + streaming_sequence={ + 'name': __name__, + 'content': ' '.join(content), + 'responses': responses, + } + ) + with mock.patch("random.uniform") as mock_uniform: + # make sleep generator deterministic + mock_uniform.side_effect = lambda a, b: b + with mock.patch("asyncio.sleep") as mock_sleep: + it = async_sequence.attempt_streaming_sequence(name=seq.name, retry=retry) + [pb.content async for pb in it] + assert mock_sleep.call_count == len(expected) + # ensure that sleep times match expected + assert mock_sleep.call_args_list == [mock.call(sleep_time) for sleep_time in expected] From 1acfa909ae22ed8aa66e34397e5c503135025161 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Tue, 12 Sep 2023 11:19:20 -0700 Subject: [PATCH 09/33] added retry to success test --- tests/system/test_retry_streaming.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/system/test_retry_streaming.py b/tests/system/test_retry_streaming.py index e54df7afbe..29bdea98e7 100644 --- a/tests/system/test_retry_streaming.py +++ b/tests/system/test_retry_streaming.py @@ -31,6 +31,7 @@ def test_streaming_retry_success(sequence): """ Test a stream with a sigle success response """ + retry = retries.Retry(predicate=retries.if_exception_type(), is_stream=True) content = ['hello', 'world'] seq = sequence.create_streaming_sequence( streaming_sequence={ @@ -40,7 +41,7 @@ def test_streaming_retry_success(sequence): 'responses': [{'status': Status(code=0), 'response_index': len(content)}], } ) - it = sequence.attempt_streaming_sequence(name=seq.name) + it = sequence.attempt_streaming_sequence(name=seq.name, retry=retry) results = [pb.content for pb in it] assert results == content # verify streaming report From 35ae4054c244fa4bc2c11904baeb32d5e6e44a5a Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Tue, 12 Sep 2023 11:20:55 -0700 Subject: [PATCH 10/33] ran black --- tests/system/test_retry_streaming.py | 180 +++++++++++++------- tests/system/test_retry_streaming_async.py | 187 ++++++++++++++------- 2 files changed, 239 insertions(+), 128 deletions(-) diff --git a/tests/system/test_retry_streaming.py b/tests/system/test_retry_streaming.py index 29bdea98e7..c22d3dd0f6 100644 --- a/tests/system/test_retry_streaming.py +++ b/tests/system/test_retry_streaming.py @@ -32,20 +32,22 @@ def test_streaming_retry_success(sequence): Test a stream with a sigle success response """ retry = retries.Retry(predicate=retries.if_exception_type(), is_stream=True) - content = ['hello', 'world'] + content = ["hello", "world"] seq = sequence.create_streaming_sequence( streaming_sequence={ - 'name': __name__, - 'content': ' '.join(content), + "name": __name__, + "content": " ".join(content), # single response with entire stream content - 'responses': [{'status': Status(code=0), 'response_index': len(content)}], + "responses": [{"status": Status(code=0), "response_index": len(content)}], } ) it = sequence.attempt_streaming_sequence(name=seq.name, retry=retry) results = [pb.content for pb in it] assert results == content # verify streaming report - report = sequence.get_streaming_sequence_report(name=f'{seq.name}/streamingSequenceReport') + report = sequence.get_streaming_sequence_report( + name=f"{seq.name}/streamingSequenceReport" + ) assert len(report.attempts) == 1 assert report.attempts[0].status == Status(code=0) @@ -55,23 +57,29 @@ def test_streaming_non_retryable_error(sequence): Test a retryable stream failing with non-retryable error """ retry = retries.Retry(predicate=retries.if_exception_type(), is_stream=True) - content = ['hello', 'world'] - error = Status(code=_code_from_exc(core_exceptions.ServiceUnavailable), message='expected error') + content = ["hello", "world"] + error = Status( + code=_code_from_exc(core_exceptions.ServiceUnavailable), + message="expected error", + ) seq = sequence.create_streaming_sequence( streaming_sequence={ - 'name': __name__, - 'content': ' '.join(content), - 'responses': [{'status': error, 'response_index': 0}], + "name": __name__, + "content": " ".join(content), + "responses": [{"status": error, "response_index": 0}], } ) with pytest.raises(core_exceptions.ServiceUnavailable): it = sequence.attempt_streaming_sequence(name=seq.name, retry=retry) next(it) # verify streaming report - report = sequence.get_streaming_sequence_report(name=f'{seq.name}/streamingSequenceReport') + report = sequence.get_streaming_sequence_report( + name=f"{seq.name}/streamingSequenceReport" + ) assert len(report.attempts) == 1 assert report.attempts[0].status == error + def test_streaming_transient_retryable(sequence): """ Server returns a retryable error a number of times before success. @@ -82,23 +90,30 @@ def test_streaming_transient_retryable(sequence): initial=0, maximum=0, timeout=1, - is_stream=True + is_stream=True, ) - content = ['hello', 'world'] - error = Status(code=_code_from_exc(core_exceptions.ServiceUnavailable), message='transient error') - responses = [{'status': error, 'response_index': 0} for _ in range(3)] + [{'status': Status(code=0), 'response_index': len(content)}] + content = ["hello", "world"] + error = Status( + code=_code_from_exc(core_exceptions.ServiceUnavailable), + message="transient error", + ) + responses = [{"status": error, "response_index": 0} for _ in range(3)] + [ + {"status": Status(code=0), "response_index": len(content)} + ] seq = sequence.create_streaming_sequence( streaming_sequence={ - 'name': __name__, - 'content': ' '.join(content), - 'responses': responses, + "name": __name__, + "content": " ".join(content), + "responses": responses, } ) it = sequence.attempt_streaming_sequence(name=seq.name, retry=retry) results = [pb.content for pb in it] assert results == content # verify streaming report - report = sequence.get_streaming_sequence_report(name=f'{seq.name}/streamingSequenceReport') + report = sequence.get_streaming_sequence_report( + name=f"{seq.name}/streamingSequenceReport" + ) assert len(report.attempts) == 4 assert report.attempts[0].status == error assert report.attempts[1].status == error @@ -115,24 +130,31 @@ def test_streaming_transient_retryable_partial_data(sequence): predicate=retries.if_exception_type(core_exceptions.ServiceUnavailable), initial=0, maximum=0, - is_stream=True + is_stream=True, + ) + content = ["hello", "world"] + error = Status( + code=_code_from_exc(core_exceptions.ServiceUnavailable), + message="transient error", ) - content = ['hello', 'world'] - error = Status(code=_code_from_exc(core_exceptions.ServiceUnavailable), message='transient error') - transient_error_list = [{'status': error, 'response_index': 1}] * 3 - responses = transient_error_list + [{'status': Status(code=0), 'response_index': len(content)}] + transient_error_list = [{"status": error, "response_index": 1}] * 3 + responses = transient_error_list + [ + {"status": Status(code=0), "response_index": len(content)} + ] seq = sequence.create_streaming_sequence( streaming_sequence={ - 'name': __name__, - 'content': ' '.join(content), - 'responses': responses, + "name": __name__, + "content": " ".join(content), + "responses": responses, } ) it = sequence.attempt_streaming_sequence(name=seq.name, retry=retry) results = [pb.content for pb in it] - assert results == ['hello'] * len(transient_error_list) + ['hello', 'world'] + assert results == ["hello"] * len(transient_error_list) + ["hello", "world"] # verify streaming report - report = sequence.get_streaming_sequence_report(name=f'{seq.name}/streamingSequenceReport') + report = sequence.get_streaming_sequence_report( + name=f"{seq.name}/streamingSequenceReport" + ) assert len(report.attempts) == 4 assert report.attempts[0].status == error assert report.attempts[1].status == error @@ -150,17 +172,24 @@ def test_streaming_retryable_eventual_timeout(sequence): initial=0, maximum=0, timeout=0.35, - is_stream=True + is_stream=True, + ) + content = ["hello", "world"] + error = Status( + code=_code_from_exc(core_exceptions.ServiceUnavailable), + message="transient error", ) - content = ['hello', 'world'] - error = Status(code=_code_from_exc(core_exceptions.ServiceUnavailable), message='transient error') - transient_error_list = [{'status': error, 'response_index': 1, 'delay': timedelta(seconds=0.15)}] * 10 - responses = transient_error_list + [{'status': Status(code=0), 'response_index': len(content)}] + transient_error_list = [ + {"status": error, "response_index": 1, "delay": timedelta(seconds=0.15)} + ] * 10 + responses = transient_error_list + [ + {"status": Status(code=0), "response_index": len(content)} + ] seq = sequence.create_streaming_sequence( streaming_sequence={ - 'name': __name__, - 'content': ' '.join(content), - 'responses': responses, + "name": __name__, + "content": " ".join(content), + "responses": responses, } ) with pytest.raises(core_exceptions.RetryError) as exc_info: @@ -169,35 +198,45 @@ def test_streaming_retryable_eventual_timeout(sequence): cause = exc_info.value.__cause__ assert isinstance(cause, core_exceptions.ServiceUnavailable) # verify streaming report - report = sequence.get_streaming_sequence_report(name=f'{seq.name}/streamingSequenceReport') + report = sequence.get_streaming_sequence_report( + name=f"{seq.name}/streamingSequenceReport" + ) assert len(report.attempts) == 3 assert report.attempts[0].status == error assert report.attempts[1].status == error assert report.attempts[2].status == error + def test_streaming_retry_on_error(sequence): """ on_error should be called for all retryable errors as they are encountered """ encountered_excs = [] + def on_error(exc): encountered_excs.append(exc) retry = retries.Retry( - predicate=retries.if_exception_type(core_exceptions.ServiceUnavailable, core_exceptions.GatewayTimeout), + predicate=retries.if_exception_type( + core_exceptions.ServiceUnavailable, core_exceptions.GatewayTimeout + ), initial=0, maximum=0, on_error=on_error, - is_stream=True + is_stream=True, ) - content = ['hello', 'world'] - errors = [core_exceptions.ServiceUnavailable, core_exceptions.DeadlineExceeded, core_exceptions.NotFound] - responses = [{'status': Status(code=_code_from_exc(exc))} for exc in errors] + content = ["hello", "world"] + errors = [ + core_exceptions.ServiceUnavailable, + core_exceptions.DeadlineExceeded, + core_exceptions.NotFound, + ] + responses = [{"status": Status(code=_code_from_exc(exc))} for exc in errors] seq = sequence.create_streaming_sequence( streaming_sequence={ - 'name': __name__, - 'content': ' '.join(content), - 'responses': responses, + "name": __name__, + "content": " ".join(content), + "responses": responses, } ) with pytest.raises(core_exceptions.NotFound): @@ -207,16 +246,24 @@ def on_error(exc): assert len(encountered_excs) == 2 assert isinstance(encountered_excs[0], core_exceptions.ServiceUnavailable) # rest raises superclass GatewayTimeout in place of DeadlineExceeded - assert isinstance(encountered_excs[1], (core_exceptions.DeadlineExceeded, core_exceptions.GatewayTimeout)) + assert isinstance( + encountered_excs[1], + (core_exceptions.DeadlineExceeded, core_exceptions.GatewayTimeout), + ) -@pytest.mark.parametrize('initial,multiplier,maximum,expected', [ - (0.1, 1.0, 0.5, [0.1, 0.1, 0.1]), - (0, 2.0, 0.5, [0, 0]), - (0.1, 2.0, 0.5, [0.1, 0.2, 0.4, 0.5, 0.5]), - (1, 1.5, 5, [1, 1.5, 2.25, 3.375, 5, 5]), -]) -def test_streaming_retry_sleep_generator(sequence, initial, multiplier, maximum, expected): +@pytest.mark.parametrize( + "initial,multiplier,maximum,expected", + [ + (0.1, 1.0, 0.5, [0.1, 0.1, 0.1]), + (0, 2.0, 0.5, [0, 0]), + (0.1, 2.0, 0.5, [0.1, 0.2, 0.4, 0.5, 0.5]), + (1, 1.5, 5, [1, 1.5, 2.25, 3.375, 5, 5]), + ], +) +def test_streaming_retry_sleep_generator( + sequence, initial, multiplier, maximum, expected +): """ should be able to pass in sleep generator to control backoff """ @@ -225,17 +272,22 @@ def test_streaming_retry_sleep_generator(sequence, initial, multiplier, maximum, initial=initial, maximum=maximum, multiplier=multiplier, - is_stream=True + is_stream=True, + ) + content = ["hello", "world"] + error = Status( + code=_code_from_exc(core_exceptions.ServiceUnavailable), + message="transient error", ) - content = ['hello', 'world'] - error = Status(code=_code_from_exc(core_exceptions.ServiceUnavailable), message='transient error') - transient_error_list = [{'status': error}] * len(expected) - responses = transient_error_list + [{'status': Status(code=0), 'response_index': len(content)}] + transient_error_list = [{"status": error}] * len(expected) + responses = transient_error_list + [ + {"status": Status(code=0), "response_index": len(content)} + ] seq = sequence.create_streaming_sequence( streaming_sequence={ - 'name': __name__, - 'content': ' '.join(content), - 'responses': responses, + "name": __name__, + "content": " ".join(content), + "responses": responses, } ) with mock.patch("random.uniform") as mock_uniform: @@ -246,4 +298,6 @@ def test_streaming_retry_sleep_generator(sequence, initial, multiplier, maximum, [pb.content for pb in it] assert mock_sleep.call_count == len(expected) # ensure that sleep times match expected - assert mock_sleep.call_args_list == [mock.call(sleep_time) for sleep_time in expected] + assert mock_sleep.call_args_list == [ + mock.call(sleep_time) for sleep_time in expected + ] diff --git a/tests/system/test_retry_streaming_async.py b/tests/system/test_retry_streaming_async.py index 68127c9524..4a6e3c4b3b 100644 --- a/tests/system/test_retry_streaming_async.py +++ b/tests/system/test_retry_streaming_async.py @@ -29,21 +29,25 @@ async def test_async_streaming_retry_success(async_sequence): """ Test a stream with a sigle success response """ - retry = retries_async.AsyncRetry(predicate=retries.if_exception_type(), is_stream=True) - content = ['hello', 'world'] + retry = retries_async.AsyncRetry( + predicate=retries.if_exception_type(), is_stream=True + ) + content = ["hello", "world"] seq = await async_sequence.create_streaming_sequence( streaming_sequence={ - 'name': __name__, - 'content': ' '.join(content), + "name": __name__, + "content": " ".join(content), # single response with entire stream content - 'responses': [{'status': Status(code=0), 'response_index': len(content)}], + "responses": [{"status": Status(code=0), "response_index": len(content)}], } ) it = async_sequence.attempt_streaming_sequence(name=seq.name, retry=retry) results = [pb.content async for pb in it] assert results == content # verify streaming report - report = await async_sequence.get_streaming_sequence_report(name=f'{seq.name}/streamingSequenceReport') + report = await async_sequence.get_streaming_sequence_report( + name=f"{seq.name}/streamingSequenceReport" + ) assert len(report.attempts) == 1 assert report.attempts[0].status == Status(code=0) @@ -53,24 +57,32 @@ async def test_async_streaming_non_retryable_error(async_sequence): """ Test a retryable stream failing with non-retryable error """ - retry = retries_async.AsyncRetry(predicate=retries.if_exception_type(), is_stream=True) - content = ['hello', 'world'] - error = Status(code=_code_from_exc(core_exceptions.ServiceUnavailable), message='expected error') + retry = retries_async.AsyncRetry( + predicate=retries.if_exception_type(), is_stream=True + ) + content = ["hello", "world"] + error = Status( + code=_code_from_exc(core_exceptions.ServiceUnavailable), + message="expected error", + ) seq = await async_sequence.create_streaming_sequence( streaming_sequence={ - 'name': __name__, - 'content': ' '.join(content), - 'responses': [{'status': error, 'response_index': 0}], + "name": __name__, + "content": " ".join(content), + "responses": [{"status": error, "response_index": 0}], } ) with pytest.raises(core_exceptions.ServiceUnavailable): it = async_sequence.attempt_streaming_sequence(name=seq.name, retry=retry) await it.__anext__() # verify streaming report - report = await async_sequence.get_streaming_sequence_report(name=f'{seq.name}/streamingSequenceReport') + report = await async_sequence.get_streaming_sequence_report( + name=f"{seq.name}/streamingSequenceReport" + ) assert len(report.attempts) == 1 assert report.attempts[0].status == error + @pytest.mark.asyncio async def test_async_streaming_transient_retryable(async_sequence): """ @@ -82,23 +94,30 @@ async def test_async_streaming_transient_retryable(async_sequence): initial=0, maximum=0, timeout=1, - is_stream=True + is_stream=True, + ) + content = ["hello", "world"] + error = Status( + code=_code_from_exc(core_exceptions.ServiceUnavailable), + message="transient error", ) - content = ['hello', 'world'] - error = Status(code=_code_from_exc(core_exceptions.ServiceUnavailable), message='transient error') - responses = [{'status': error, 'response_index': 0} for _ in range(3)] + [{'status': Status(code=0), 'response_index': len(content)}] + responses = [{"status": error, "response_index": 0} for _ in range(3)] + [ + {"status": Status(code=0), "response_index": len(content)} + ] seq = await async_sequence.create_streaming_sequence( streaming_sequence={ - 'name': __name__, - 'content': ' '.join(content), - 'responses': responses, + "name": __name__, + "content": " ".join(content), + "responses": responses, } ) it = async_sequence.attempt_streaming_sequence(name=seq.name, retry=retry) results = [pb.content async for pb in it] assert results == content # verify streaming report - report = await async_sequence.get_streaming_sequence_report(name=f'{seq.name}/streamingSequenceReport') + report = await async_sequence.get_streaming_sequence_report( + name=f"{seq.name}/streamingSequenceReport" + ) assert len(report.attempts) == 4 assert report.attempts[0].status == error assert report.attempts[1].status == error @@ -116,24 +135,31 @@ async def test_async_streaming_transient_retryable_partial_data(async_sequence): predicate=retries.if_exception_type(core_exceptions.ServiceUnavailable), initial=0, maximum=0, - is_stream=True + is_stream=True, + ) + content = ["hello", "world"] + error = Status( + code=_code_from_exc(core_exceptions.ServiceUnavailable), + message="transient error", ) - content = ['hello', 'world'] - error = Status(code=_code_from_exc(core_exceptions.ServiceUnavailable), message='transient error') - transient_error_list = [{'status': error, 'response_index': 1}] * 3 - responses = transient_error_list + [{'status': Status(code=0), 'response_index': len(content)}] + transient_error_list = [{"status": error, "response_index": 1}] * 3 + responses = transient_error_list + [ + {"status": Status(code=0), "response_index": len(content)} + ] seq = await async_sequence.create_streaming_sequence( streaming_sequence={ - 'name': __name__, - 'content': ' '.join(content), - 'responses': responses, + "name": __name__, + "content": " ".join(content), + "responses": responses, } ) it = async_sequence.attempt_streaming_sequence(name=seq.name, retry=retry) results = [pb.content async for pb in it] - assert results == ['hello'] * len(transient_error_list) + ['hello', 'world'] + assert results == ["hello"] * len(transient_error_list) + ["hello", "world"] # verify streaming report - report = await async_sequence.get_streaming_sequence_report(name=f'{seq.name}/streamingSequenceReport') + report = await async_sequence.get_streaming_sequence_report( + name=f"{seq.name}/streamingSequenceReport" + ) assert len(report.attempts) == 4 assert report.attempts[0].status == error assert report.attempts[1].status == error @@ -152,17 +178,24 @@ async def test_async_streaming_retryable_eventual_timeout(async_sequence): initial=0, maximum=0, timeout=0.35, - is_stream=True + is_stream=True, + ) + content = ["hello", "world"] + error = Status( + code=_code_from_exc(core_exceptions.ServiceUnavailable), + message="transient error", ) - content = ['hello', 'world'] - error = Status(code=_code_from_exc(core_exceptions.ServiceUnavailable), message='transient error') - transient_error_list = [{'status': error, 'response_index': 1, 'delay': timedelta(seconds=0.15)}] * 10 - responses = transient_error_list + [{'status': Status(code=0), 'response_index': len(content)}] + transient_error_list = [ + {"status": error, "response_index": 1, "delay": timedelta(seconds=0.15)} + ] * 10 + responses = transient_error_list + [ + {"status": Status(code=0), "response_index": len(content)} + ] seq = await async_sequence.create_streaming_sequence( streaming_sequence={ - 'name': __name__, - 'content': ' '.join(content), - 'responses': responses, + "name": __name__, + "content": " ".join(content), + "responses": responses, } ) with pytest.raises(core_exceptions.RetryError) as exc_info: @@ -171,7 +204,9 @@ async def test_async_streaming_retryable_eventual_timeout(async_sequence): cause = exc_info.value.__cause__ assert isinstance(cause, core_exceptions.ServiceUnavailable) # verify streaming report - report = await async_sequence.get_streaming_sequence_report(name=f'{seq.name}/streamingSequenceReport') + report = await async_sequence.get_streaming_sequence_report( + name=f"{seq.name}/streamingSequenceReport" + ) assert len(report.attempts) == 3 assert report.attempts[0].status == error assert report.attempts[1].status == error @@ -184,24 +219,31 @@ async def test_async_streaming_retry_on_error(async_sequence): on_error should be called for all retryable errors as they are encountered """ encountered_excs = [] + def on_error(exc): encountered_excs.append(exc) retry = retries_async.AsyncRetry( - predicate=retries.if_exception_type(core_exceptions.ServiceUnavailable, core_exceptions.GatewayTimeout), + predicate=retries.if_exception_type( + core_exceptions.ServiceUnavailable, core_exceptions.GatewayTimeout + ), initial=0, maximum=0, on_error=on_error, - is_stream=True + is_stream=True, ) - content = ['hello', 'world'] - errors = [core_exceptions.ServiceUnavailable, core_exceptions.DeadlineExceeded, core_exceptions.NotFound] - responses = [{'status': Status(code=_code_from_exc(exc))} for exc in errors] + content = ["hello", "world"] + errors = [ + core_exceptions.ServiceUnavailable, + core_exceptions.DeadlineExceeded, + core_exceptions.NotFound, + ] + responses = [{"status": Status(code=_code_from_exc(exc))} for exc in errors] seq = await async_sequence.create_streaming_sequence( streaming_sequence={ - 'name': __name__, - 'content': ' '.join(content), - 'responses': responses, + "name": __name__, + "content": " ".join(content), + "responses": responses, } ) with pytest.raises(core_exceptions.NotFound): @@ -211,17 +253,25 @@ def on_error(exc): assert len(encountered_excs) == 2 assert isinstance(encountered_excs[0], core_exceptions.ServiceUnavailable) # rest raises superclass GatewayTimeout in place of DeadlineExceeded - assert isinstance(encountered_excs[1], (core_exceptions.DeadlineExceeded, core_exceptions.GatewayTimeout)) + assert isinstance( + encountered_excs[1], + (core_exceptions.DeadlineExceeded, core_exceptions.GatewayTimeout), + ) -@pytest.mark.parametrize('initial,multiplier,maximum,expected', [ - (0.1, 1.0, 0.5, [0.1, 0.1, 0.1]), - (0, 2.0, 0.5, [0, 0]), - (0.1, 2.0, 0.5, [0.1, 0.2, 0.4, 0.5, 0.5]), - (1, 1.5, 5, [1, 1.5, 2.25, 3.375, 5, 5]), -]) +@pytest.mark.parametrize( + "initial,multiplier,maximum,expected", + [ + (0.1, 1.0, 0.5, [0.1, 0.1, 0.1]), + (0, 2.0, 0.5, [0, 0]), + (0.1, 2.0, 0.5, [0.1, 0.2, 0.4, 0.5, 0.5]), + (1, 1.5, 5, [1, 1.5, 2.25, 3.375, 5, 5]), + ], +) @pytest.mark.asyncio -async def test_async_streaming_retry_sleep_generator(async_sequence, initial, multiplier, maximum, expected): +async def test_async_streaming_retry_sleep_generator( + async_sequence, initial, multiplier, maximum, expected +): """ should be able to pass in sleep generator to control backoff """ @@ -230,17 +280,22 @@ async def test_async_streaming_retry_sleep_generator(async_sequence, initial, mu initial=initial, maximum=maximum, multiplier=multiplier, - is_stream=True + is_stream=True, + ) + content = ["hello", "world"] + error = Status( + code=_code_from_exc(core_exceptions.ServiceUnavailable), + message="transient error", ) - content = ['hello', 'world'] - error = Status(code=_code_from_exc(core_exceptions.ServiceUnavailable), message='transient error') - transient_error_list = [{'status': error}] * len(expected) - responses = transient_error_list + [{'status': Status(code=0), 'response_index': len(content)}] + transient_error_list = [{"status": error}] * len(expected) + responses = transient_error_list + [ + {"status": Status(code=0), "response_index": len(content)} + ] seq = await async_sequence.create_streaming_sequence( streaming_sequence={ - 'name': __name__, - 'content': ' '.join(content), - 'responses': responses, + "name": __name__, + "content": " ".join(content), + "responses": responses, } ) with mock.patch("random.uniform") as mock_uniform: @@ -251,4 +306,6 @@ async def test_async_streaming_retry_sleep_generator(async_sequence, initial, mu [pb.content async for pb in it] assert mock_sleep.call_count == len(expected) # ensure that sleep times match expected - assert mock_sleep.call_args_list == [mock.call(sleep_time) for sleep_time in expected] + assert mock_sleep.call_args_list == [ + mock.call(sleep_time) for sleep_time in expected + ] From 899f5bccaffff3f9273426552824034ac5ddda14 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Tue, 12 Sep 2023 11:22:35 -0700 Subject: [PATCH 11/33] added awaits to wrapped sequences --- tests/system/test_retry_streaming_async.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tests/system/test_retry_streaming_async.py b/tests/system/test_retry_streaming_async.py index 4a6e3c4b3b..1a91e04bae 100644 --- a/tests/system/test_retry_streaming_async.py +++ b/tests/system/test_retry_streaming_async.py @@ -41,7 +41,7 @@ async def test_async_streaming_retry_success(async_sequence): "responses": [{"status": Status(code=0), "response_index": len(content)}], } ) - it = async_sequence.attempt_streaming_sequence(name=seq.name, retry=retry) + it = await async_sequence.attempt_streaming_sequence(name=seq.name, retry=retry) results = [pb.content async for pb in it] assert results == content # verify streaming report @@ -73,7 +73,7 @@ async def test_async_streaming_non_retryable_error(async_sequence): } ) with pytest.raises(core_exceptions.ServiceUnavailable): - it = async_sequence.attempt_streaming_sequence(name=seq.name, retry=retry) + it = await async_sequence.attempt_streaming_sequence(name=seq.name, retry=retry) await it.__anext__() # verify streaming report report = await async_sequence.get_streaming_sequence_report( @@ -111,7 +111,7 @@ async def test_async_streaming_transient_retryable(async_sequence): "responses": responses, } ) - it = async_sequence.attempt_streaming_sequence(name=seq.name, retry=retry) + it = await async_sequence.attempt_streaming_sequence(name=seq.name, retry=retry) results = [pb.content async for pb in it] assert results == content # verify streaming report @@ -153,7 +153,7 @@ async def test_async_streaming_transient_retryable_partial_data(async_sequence): "responses": responses, } ) - it = async_sequence.attempt_streaming_sequence(name=seq.name, retry=retry) + it = await async_sequence.attempt_streaming_sequence(name=seq.name, retry=retry) results = [pb.content async for pb in it] assert results == ["hello"] * len(transient_error_list) + ["hello", "world"] # verify streaming report @@ -199,7 +199,7 @@ async def test_async_streaming_retryable_eventual_timeout(async_sequence): } ) with pytest.raises(core_exceptions.RetryError) as exc_info: - it = async_sequence.attempt_streaming_sequence(name=seq.name, retry=retry) + it = await async_sequence.attempt_streaming_sequence(name=seq.name, retry=retry) [pb.content async for pb in it] cause = exc_info.value.__cause__ assert isinstance(cause, core_exceptions.ServiceUnavailable) @@ -247,7 +247,7 @@ def on_error(exc): } ) with pytest.raises(core_exceptions.NotFound): - it = async_sequence.attempt_streaming_sequence(name=seq.name, retry=retry) + it = await async_sequence.attempt_streaming_sequence(name=seq.name, retry=retry) [pb.content async for pb in it] # on_error should have been called on the first two errors, but not the terminal one assert len(encountered_excs) == 2 @@ -302,7 +302,7 @@ async def test_async_streaming_retry_sleep_generator( # make sleep generator deterministic mock_uniform.side_effect = lambda a, b: b with mock.patch("asyncio.sleep") as mock_sleep: - it = async_sequence.attempt_streaming_sequence(name=seq.name, retry=retry) + it = await async_sequence.attempt_streaming_sequence(name=seq.name, retry=retry) [pb.content async for pb in it] assert mock_sleep.call_count == len(expected) # ensure that sleep times match expected From 2458b16979adb83c8c976e8835217e453b008bdb Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Tue, 12 Sep 2023 18:58:33 +0000 Subject: [PATCH 12/33] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20?= =?UTF-8?q?post-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- .github/auto-label.yaml | 2 +- .kokoro/build.sh | 2 +- .kokoro/docker/docs/Dockerfile | 2 +- .kokoro/populate-secrets.sh | 2 +- .kokoro/release.sh | 2 +- .kokoro/release/common.cfg | 9 ----- .kokoro/requirements.txt | 64 ++++++++++++++++------------------ .kokoro/trampoline.sh | 2 +- .kokoro/trampoline_v2.sh | 2 +- .trampolinerc | 4 ++- 10 files changed, 41 insertions(+), 50 deletions(-) diff --git a/.github/auto-label.yaml b/.github/auto-label.yaml index b2016d119b..41bff0b537 100644 --- a/.github/auto-label.yaml +++ b/.github/auto-label.yaml @@ -1,4 +1,4 @@ -# Copyright 2023 Google LLC +# Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/.kokoro/build.sh b/.kokoro/build.sh index ecf29aa67f..a8340f3a58 100755 --- a/.kokoro/build.sh +++ b/.kokoro/build.sh @@ -1,5 +1,5 @@ #!/bin/bash -# Copyright 2023 Google LLC +# Copyright 2018 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/.kokoro/docker/docs/Dockerfile b/.kokoro/docker/docs/Dockerfile index 8e39a2cc43..f8137d0ae4 100644 --- a/.kokoro/docker/docs/Dockerfile +++ b/.kokoro/docker/docs/Dockerfile @@ -1,4 +1,4 @@ -# Copyright 2023 Google LLC +# Copyright 2020 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/.kokoro/populate-secrets.sh b/.kokoro/populate-secrets.sh index 6f3972140e..f52514257e 100755 --- a/.kokoro/populate-secrets.sh +++ b/.kokoro/populate-secrets.sh @@ -1,5 +1,5 @@ #!/bin/bash -# Copyright 2023 Google LLC. +# Copyright 2020 Google LLC. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/.kokoro/release.sh b/.kokoro/release.sh index ea4f0153bf..336f0eca1e 100755 --- a/.kokoro/release.sh +++ b/.kokoro/release.sh @@ -1,5 +1,5 @@ #!/bin/bash -# Copyright 2023 Google LLC +# Copyright 2020 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/.kokoro/release/common.cfg b/.kokoro/release/common.cfg index 1f5dced3c1..72000839ce 100644 --- a/.kokoro/release/common.cfg +++ b/.kokoro/release/common.cfg @@ -38,12 +38,3 @@ env_vars: { key: "SECRET_MANAGER_KEYS" value: "releasetool-publish-reporter-app,releasetool-publish-reporter-googleapis-installation,releasetool-publish-reporter-pem" } - -# Store the packages we uploaded to PyPI. That way, we have a record of exactly -# what we published, which we can use to generate SBOMs and attestations. -action { - define_artifacts { - regex: "github/gapic-generator-python/**/*.tar.gz" - strip_prefix: "github/gapic-generator-python" - } -} diff --git a/.kokoro/requirements.txt b/.kokoro/requirements.txt index 029bd342de..66a2172a76 100644 --- a/.kokoro/requirements.txt +++ b/.kokoro/requirements.txt @@ -20,9 +20,9 @@ cachetools==5.2.0 \ --hash=sha256:6a94c6402995a99c3970cc7e4884bb60b4a8639938157eeed436098bf9831757 \ --hash=sha256:f9f17d2aec496a9aa6b76f53e3b614c965223c061982d434d160f930c698a9db # via google-auth -certifi==2023.7.22 \ - --hash=sha256:539cc1d13202e33ca466e88b2807e29f4c13049d6d87031a3c110744495cb082 \ - --hash=sha256:92d6037539857d8206b8f6ae472e8b77db8058fec5937a1ef3f54304089edbb9 +certifi==2022.12.7 \ + --hash=sha256:35824b4c3a97115964b408844d64aa14db1cc518f6562e8d7261699d1350a9e3 \ + --hash=sha256:4ad3232f5e926d6718ec31cfc1fcadfde020920e278684144551c91769c7bc18 # via requests cffi==1.15.1 \ --hash=sha256:00a9ed42e88df81ffae7a8ab6d9356b371399b91dbdf0c3cb1e84c03a13aceb5 \ @@ -113,30 +113,28 @@ commonmark==0.9.1 \ --hash=sha256:452f9dc859be7f06631ddcb328b6919c67984aca654e5fefb3914d54691aed60 \ --hash=sha256:da2f38c92590f83de410ba1a3cbceafbc74fee9def35f9251ba9a971d6d66fd9 # via rich -cryptography==41.0.3 \ - --hash=sha256:0d09fb5356f975974dbcb595ad2d178305e5050656affb7890a1583f5e02a306 \ - --hash=sha256:23c2d778cf829f7d0ae180600b17e9fceea3c2ef8b31a99e3c694cbbf3a24b84 \ - --hash=sha256:3fb248989b6363906827284cd20cca63bb1a757e0a2864d4c1682a985e3dca47 \ - --hash=sha256:41d7aa7cdfded09b3d73a47f429c298e80796c8e825ddfadc84c8a7f12df212d \ - --hash=sha256:42cb413e01a5d36da9929baa9d70ca90d90b969269e5a12d39c1e0d475010116 \ - --hash=sha256:4c2f0d35703d61002a2bbdcf15548ebb701cfdd83cdc12471d2bae80878a4207 \ - --hash=sha256:4fd871184321100fb400d759ad0cddddf284c4b696568204d281c902fc7b0d81 \ - --hash=sha256:5259cb659aa43005eb55a0e4ff2c825ca111a0da1814202c64d28a985d33b087 \ - --hash=sha256:57a51b89f954f216a81c9d057bf1a24e2f36e764a1ca9a501a6964eb4a6800dd \ - --hash=sha256:652627a055cb52a84f8c448185922241dd5217443ca194d5739b44612c5e6507 \ - --hash=sha256:67e120e9a577c64fe1f611e53b30b3e69744e5910ff3b6e97e935aeb96005858 \ - --hash=sha256:6af1c6387c531cd364b72c28daa29232162010d952ceb7e5ca8e2827526aceae \ - --hash=sha256:6d192741113ef5e30d89dcb5b956ef4e1578f304708701b8b73d38e3e1461f34 \ - --hash=sha256:7efe8041897fe7a50863e51b77789b657a133c75c3b094e51b5e4b5cec7bf906 \ - --hash=sha256:84537453d57f55a50a5b6835622ee405816999a7113267739a1b4581f83535bd \ - --hash=sha256:8f09daa483aedea50d249ef98ed500569841d6498aa9c9f4b0531b9964658922 \ - --hash=sha256:95dd7f261bb76948b52a5330ba5202b91a26fbac13ad0e9fc8a3ac04752058c7 \ - --hash=sha256:a74fbcdb2a0d46fe00504f571a2a540532f4c188e6ccf26f1f178480117b33c4 \ - --hash=sha256:a983e441a00a9d57a4d7c91b3116a37ae602907a7618b882c8013b5762e80574 \ - --hash=sha256:ab8de0d091acbf778f74286f4989cf3d1528336af1b59f3e5d2ebca8b5fe49e1 \ - --hash=sha256:aeb57c421b34af8f9fe830e1955bf493a86a7996cc1338fe41b30047d16e962c \ - --hash=sha256:ce785cf81a7bdade534297ef9e490ddff800d956625020ab2ec2780a556c313e \ - --hash=sha256:d0d651aa754ef58d75cec6edfbd21259d93810b73f6ec246436a21b7841908de +cryptography==39.0.1 \ + --hash=sha256:0f8da300b5c8af9f98111ffd512910bc792b4c77392a9523624680f7956a99d4 \ + --hash=sha256:35f7c7d015d474f4011e859e93e789c87d21f6f4880ebdc29896a60403328f1f \ + --hash=sha256:5aa67414fcdfa22cf052e640cb5ddc461924a045cacf325cd164e65312d99502 \ + --hash=sha256:5d2d8b87a490bfcd407ed9d49093793d0f75198a35e6eb1a923ce1ee86c62b41 \ + --hash=sha256:6687ef6d0a6497e2b58e7c5b852b53f62142cfa7cd1555795758934da363a965 \ + --hash=sha256:6f8ba7f0328b79f08bdacc3e4e66fb4d7aab0c3584e0bd41328dce5262e26b2e \ + --hash=sha256:706843b48f9a3f9b9911979761c91541e3d90db1ca905fd63fee540a217698bc \ + --hash=sha256:807ce09d4434881ca3a7594733669bd834f5b2c6d5c7e36f8c00f691887042ad \ + --hash=sha256:83e17b26de248c33f3acffb922748151d71827d6021d98c70e6c1a25ddd78505 \ + --hash=sha256:96f1157a7c08b5b189b16b47bc9db2332269d6680a196341bf30046330d15388 \ + --hash=sha256:aec5a6c9864be7df2240c382740fcf3b96928c46604eaa7f3091f58b878c0bb6 \ + --hash=sha256:b0afd054cd42f3d213bf82c629efb1ee5f22eba35bf0eec88ea9ea7304f511a2 \ + --hash=sha256:ced4e447ae29ca194449a3f1ce132ded8fcab06971ef5f618605aacaa612beac \ + --hash=sha256:d1f6198ee6d9148405e49887803907fe8962a23e6c6f83ea7d98f1c0de375695 \ + --hash=sha256:e124352fd3db36a9d4a21c1aa27fd5d051e621845cb87fb851c08f4f75ce8be6 \ + --hash=sha256:e422abdec8b5fa8462aa016786680720d78bdce7a30c652b7fadf83a4ba35336 \ + --hash=sha256:ef8b72fa70b348724ff1218267e7f7375b8de4e8194d1636ee60510aae104cd0 \ + --hash=sha256:f0c64d1bd842ca2633e74a1a28033d139368ad959872533b1bab8c80e8240a0c \ + --hash=sha256:f24077a3b5298a5a06a8e0536e3ea9ec60e4c7ac486755e5fb6e6ea9b3500106 \ + --hash=sha256:fdd188c8a6ef8769f148f88f859884507b954cc64db6b52f66ef199bb9ad660a \ + --hash=sha256:fe913f20024eb2cb2f323e42a64bdf2911bb9738a15dba7d3cce48151034e3a8 # via # gcp-releasetool # secretstorage @@ -396,9 +394,9 @@ pycparser==2.21 \ --hash=sha256:8ee45429555515e1f6b185e78100aea234072576aa43ab53aefcae078162fca9 \ --hash=sha256:e644fdec12f7872f86c58ff790da456218b10f863970249516d60a5eaca77206 # via cffi -pygments==2.15.0 \ - --hash=sha256:77a3299119af881904cd5ecd1ac6a66214b6e9bed1f2db16993b54adede64094 \ - --hash=sha256:f7e36cffc4c517fbc252861b9a6e4644ca0e5abadf9a113c72d1358ad09b9500 +pygments==2.13.0 \ + --hash=sha256:56a8508ae95f98e2b9bdf93a6be5ae3f7d8af858b43e02c5a2ff083726be40c1 \ + --hash=sha256:f643f331ab57ba3c9d89212ee4a2dabc6e94f117cf4eefde99a0574720d14c42 # via # readme-renderer # rich @@ -421,9 +419,9 @@ readme-renderer==37.3 \ --hash=sha256:cd653186dfc73055656f090f227f5cb22a046d7f71a841dfa305f55c9a513273 \ --hash=sha256:f67a16caedfa71eef48a31b39708637a6f4664c4394801a7b0d6432d13907343 # via twine -requests==2.31.0 \ - --hash=sha256:58cd2187c01e70e6e26505bca751777aa9f2ee0b7f4300988b709f44e013003f \ - --hash=sha256:942c5a758f98d790eaed1a29cb6eefc7ffb0d1cf7af05c3d2791656dbd6ad1e1 +requests==2.28.1 \ + --hash=sha256:7c5599b102feddaa661c826c56ab4fee28bfd17f5abca1ebbe3e7f19d7c97983 \ + --hash=sha256:8fefa2a1a1365bf5520aac41836fbee479da67864514bdb821f31ce07ce65349 # via # gcp-releasetool # google-api-core diff --git a/.kokoro/trampoline.sh b/.kokoro/trampoline.sh index d85b1f2676..f39236e943 100755 --- a/.kokoro/trampoline.sh +++ b/.kokoro/trampoline.sh @@ -1,5 +1,5 @@ #!/bin/bash -# Copyright 2023 Google LLC +# Copyright 2017 Google Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/.kokoro/trampoline_v2.sh b/.kokoro/trampoline_v2.sh index 59a7cf3a93..4af6cdc26d 100755 --- a/.kokoro/trampoline_v2.sh +++ b/.kokoro/trampoline_v2.sh @@ -1,5 +1,5 @@ #!/usr/bin/env bash -# Copyright 2023 Google LLC +# Copyright 2020 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/.trampolinerc b/.trampolinerc index a7dfeb42c6..0eee72ab62 100644 --- a/.trampolinerc +++ b/.trampolinerc @@ -1,4 +1,4 @@ -# Copyright 2023 Google LLC +# Copyright 2020 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +# Template for .trampolinerc + # Add required env vars here. required_envvars+=( ) From 66f80c2c35afc349251307bd7b67a207fafe3fc6 Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Tue, 12 Sep 2023 19:00:04 +0000 Subject: [PATCH 13/33] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20?= =?UTF-8?q?post-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- .github/auto-label.yaml | 2 +- .kokoro/build.sh | 2 +- .kokoro/docker/docs/Dockerfile | 2 +- .kokoro/populate-secrets.sh | 2 +- .kokoro/release.sh | 2 +- .kokoro/release/common.cfg | 9 +++++ .kokoro/requirements.txt | 64 ++++++++++++++++++---------------- .kokoro/trampoline.sh | 2 +- .kokoro/trampoline_v2.sh | 2 +- .trampolinerc | 4 +-- 10 files changed, 50 insertions(+), 41 deletions(-) diff --git a/.github/auto-label.yaml b/.github/auto-label.yaml index 41bff0b537..b2016d119b 100644 --- a/.github/auto-label.yaml +++ b/.github/auto-label.yaml @@ -1,4 +1,4 @@ -# Copyright 2022 Google LLC +# Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/.kokoro/build.sh b/.kokoro/build.sh index a8340f3a58..ecf29aa67f 100755 --- a/.kokoro/build.sh +++ b/.kokoro/build.sh @@ -1,5 +1,5 @@ #!/bin/bash -# Copyright 2018 Google LLC +# Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/.kokoro/docker/docs/Dockerfile b/.kokoro/docker/docs/Dockerfile index f8137d0ae4..8e39a2cc43 100644 --- a/.kokoro/docker/docs/Dockerfile +++ b/.kokoro/docker/docs/Dockerfile @@ -1,4 +1,4 @@ -# Copyright 2020 Google LLC +# Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/.kokoro/populate-secrets.sh b/.kokoro/populate-secrets.sh index f52514257e..6f3972140e 100755 --- a/.kokoro/populate-secrets.sh +++ b/.kokoro/populate-secrets.sh @@ -1,5 +1,5 @@ #!/bin/bash -# Copyright 2020 Google LLC. +# Copyright 2023 Google LLC. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/.kokoro/release.sh b/.kokoro/release.sh index 336f0eca1e..ea4f0153bf 100755 --- a/.kokoro/release.sh +++ b/.kokoro/release.sh @@ -1,5 +1,5 @@ #!/bin/bash -# Copyright 2020 Google LLC +# Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/.kokoro/release/common.cfg b/.kokoro/release/common.cfg index 72000839ce..1f5dced3c1 100644 --- a/.kokoro/release/common.cfg +++ b/.kokoro/release/common.cfg @@ -38,3 +38,12 @@ env_vars: { key: "SECRET_MANAGER_KEYS" value: "releasetool-publish-reporter-app,releasetool-publish-reporter-googleapis-installation,releasetool-publish-reporter-pem" } + +# Store the packages we uploaded to PyPI. That way, we have a record of exactly +# what we published, which we can use to generate SBOMs and attestations. +action { + define_artifacts { + regex: "github/gapic-generator-python/**/*.tar.gz" + strip_prefix: "github/gapic-generator-python" + } +} diff --git a/.kokoro/requirements.txt b/.kokoro/requirements.txt index 66a2172a76..029bd342de 100644 --- a/.kokoro/requirements.txt +++ b/.kokoro/requirements.txt @@ -20,9 +20,9 @@ cachetools==5.2.0 \ --hash=sha256:6a94c6402995a99c3970cc7e4884bb60b4a8639938157eeed436098bf9831757 \ --hash=sha256:f9f17d2aec496a9aa6b76f53e3b614c965223c061982d434d160f930c698a9db # via google-auth -certifi==2022.12.7 \ - --hash=sha256:35824b4c3a97115964b408844d64aa14db1cc518f6562e8d7261699d1350a9e3 \ - --hash=sha256:4ad3232f5e926d6718ec31cfc1fcadfde020920e278684144551c91769c7bc18 +certifi==2023.7.22 \ + --hash=sha256:539cc1d13202e33ca466e88b2807e29f4c13049d6d87031a3c110744495cb082 \ + --hash=sha256:92d6037539857d8206b8f6ae472e8b77db8058fec5937a1ef3f54304089edbb9 # via requests cffi==1.15.1 \ --hash=sha256:00a9ed42e88df81ffae7a8ab6d9356b371399b91dbdf0c3cb1e84c03a13aceb5 \ @@ -113,28 +113,30 @@ commonmark==0.9.1 \ --hash=sha256:452f9dc859be7f06631ddcb328b6919c67984aca654e5fefb3914d54691aed60 \ --hash=sha256:da2f38c92590f83de410ba1a3cbceafbc74fee9def35f9251ba9a971d6d66fd9 # via rich -cryptography==39.0.1 \ - --hash=sha256:0f8da300b5c8af9f98111ffd512910bc792b4c77392a9523624680f7956a99d4 \ - --hash=sha256:35f7c7d015d474f4011e859e93e789c87d21f6f4880ebdc29896a60403328f1f \ - --hash=sha256:5aa67414fcdfa22cf052e640cb5ddc461924a045cacf325cd164e65312d99502 \ - --hash=sha256:5d2d8b87a490bfcd407ed9d49093793d0f75198a35e6eb1a923ce1ee86c62b41 \ - --hash=sha256:6687ef6d0a6497e2b58e7c5b852b53f62142cfa7cd1555795758934da363a965 \ - --hash=sha256:6f8ba7f0328b79f08bdacc3e4e66fb4d7aab0c3584e0bd41328dce5262e26b2e \ - --hash=sha256:706843b48f9a3f9b9911979761c91541e3d90db1ca905fd63fee540a217698bc \ - --hash=sha256:807ce09d4434881ca3a7594733669bd834f5b2c6d5c7e36f8c00f691887042ad \ - --hash=sha256:83e17b26de248c33f3acffb922748151d71827d6021d98c70e6c1a25ddd78505 \ - --hash=sha256:96f1157a7c08b5b189b16b47bc9db2332269d6680a196341bf30046330d15388 \ - --hash=sha256:aec5a6c9864be7df2240c382740fcf3b96928c46604eaa7f3091f58b878c0bb6 \ - --hash=sha256:b0afd054cd42f3d213bf82c629efb1ee5f22eba35bf0eec88ea9ea7304f511a2 \ - --hash=sha256:ced4e447ae29ca194449a3f1ce132ded8fcab06971ef5f618605aacaa612beac \ - --hash=sha256:d1f6198ee6d9148405e49887803907fe8962a23e6c6f83ea7d98f1c0de375695 \ - --hash=sha256:e124352fd3db36a9d4a21c1aa27fd5d051e621845cb87fb851c08f4f75ce8be6 \ - --hash=sha256:e422abdec8b5fa8462aa016786680720d78bdce7a30c652b7fadf83a4ba35336 \ - --hash=sha256:ef8b72fa70b348724ff1218267e7f7375b8de4e8194d1636ee60510aae104cd0 \ - --hash=sha256:f0c64d1bd842ca2633e74a1a28033d139368ad959872533b1bab8c80e8240a0c \ - --hash=sha256:f24077a3b5298a5a06a8e0536e3ea9ec60e4c7ac486755e5fb6e6ea9b3500106 \ - --hash=sha256:fdd188c8a6ef8769f148f88f859884507b954cc64db6b52f66ef199bb9ad660a \ - --hash=sha256:fe913f20024eb2cb2f323e42a64bdf2911bb9738a15dba7d3cce48151034e3a8 +cryptography==41.0.3 \ + --hash=sha256:0d09fb5356f975974dbcb595ad2d178305e5050656affb7890a1583f5e02a306 \ + --hash=sha256:23c2d778cf829f7d0ae180600b17e9fceea3c2ef8b31a99e3c694cbbf3a24b84 \ + --hash=sha256:3fb248989b6363906827284cd20cca63bb1a757e0a2864d4c1682a985e3dca47 \ + --hash=sha256:41d7aa7cdfded09b3d73a47f429c298e80796c8e825ddfadc84c8a7f12df212d \ + --hash=sha256:42cb413e01a5d36da9929baa9d70ca90d90b969269e5a12d39c1e0d475010116 \ + --hash=sha256:4c2f0d35703d61002a2bbdcf15548ebb701cfdd83cdc12471d2bae80878a4207 \ + --hash=sha256:4fd871184321100fb400d759ad0cddddf284c4b696568204d281c902fc7b0d81 \ + --hash=sha256:5259cb659aa43005eb55a0e4ff2c825ca111a0da1814202c64d28a985d33b087 \ + --hash=sha256:57a51b89f954f216a81c9d057bf1a24e2f36e764a1ca9a501a6964eb4a6800dd \ + --hash=sha256:652627a055cb52a84f8c448185922241dd5217443ca194d5739b44612c5e6507 \ + --hash=sha256:67e120e9a577c64fe1f611e53b30b3e69744e5910ff3b6e97e935aeb96005858 \ + --hash=sha256:6af1c6387c531cd364b72c28daa29232162010d952ceb7e5ca8e2827526aceae \ + --hash=sha256:6d192741113ef5e30d89dcb5b956ef4e1578f304708701b8b73d38e3e1461f34 \ + --hash=sha256:7efe8041897fe7a50863e51b77789b657a133c75c3b094e51b5e4b5cec7bf906 \ + --hash=sha256:84537453d57f55a50a5b6835622ee405816999a7113267739a1b4581f83535bd \ + --hash=sha256:8f09daa483aedea50d249ef98ed500569841d6498aa9c9f4b0531b9964658922 \ + --hash=sha256:95dd7f261bb76948b52a5330ba5202b91a26fbac13ad0e9fc8a3ac04752058c7 \ + --hash=sha256:a74fbcdb2a0d46fe00504f571a2a540532f4c188e6ccf26f1f178480117b33c4 \ + --hash=sha256:a983e441a00a9d57a4d7c91b3116a37ae602907a7618b882c8013b5762e80574 \ + --hash=sha256:ab8de0d091acbf778f74286f4989cf3d1528336af1b59f3e5d2ebca8b5fe49e1 \ + --hash=sha256:aeb57c421b34af8f9fe830e1955bf493a86a7996cc1338fe41b30047d16e962c \ + --hash=sha256:ce785cf81a7bdade534297ef9e490ddff800d956625020ab2ec2780a556c313e \ + --hash=sha256:d0d651aa754ef58d75cec6edfbd21259d93810b73f6ec246436a21b7841908de # via # gcp-releasetool # secretstorage @@ -394,9 +396,9 @@ pycparser==2.21 \ --hash=sha256:8ee45429555515e1f6b185e78100aea234072576aa43ab53aefcae078162fca9 \ --hash=sha256:e644fdec12f7872f86c58ff790da456218b10f863970249516d60a5eaca77206 # via cffi -pygments==2.13.0 \ - --hash=sha256:56a8508ae95f98e2b9bdf93a6be5ae3f7d8af858b43e02c5a2ff083726be40c1 \ - --hash=sha256:f643f331ab57ba3c9d89212ee4a2dabc6e94f117cf4eefde99a0574720d14c42 +pygments==2.15.0 \ + --hash=sha256:77a3299119af881904cd5ecd1ac6a66214b6e9bed1f2db16993b54adede64094 \ + --hash=sha256:f7e36cffc4c517fbc252861b9a6e4644ca0e5abadf9a113c72d1358ad09b9500 # via # readme-renderer # rich @@ -419,9 +421,9 @@ readme-renderer==37.3 \ --hash=sha256:cd653186dfc73055656f090f227f5cb22a046d7f71a841dfa305f55c9a513273 \ --hash=sha256:f67a16caedfa71eef48a31b39708637a6f4664c4394801a7b0d6432d13907343 # via twine -requests==2.28.1 \ - --hash=sha256:7c5599b102feddaa661c826c56ab4fee28bfd17f5abca1ebbe3e7f19d7c97983 \ - --hash=sha256:8fefa2a1a1365bf5520aac41836fbee479da67864514bdb821f31ce07ce65349 +requests==2.31.0 \ + --hash=sha256:58cd2187c01e70e6e26505bca751777aa9f2ee0b7f4300988b709f44e013003f \ + --hash=sha256:942c5a758f98d790eaed1a29cb6eefc7ffb0d1cf7af05c3d2791656dbd6ad1e1 # via # gcp-releasetool # google-api-core diff --git a/.kokoro/trampoline.sh b/.kokoro/trampoline.sh index f39236e943..d85b1f2676 100755 --- a/.kokoro/trampoline.sh +++ b/.kokoro/trampoline.sh @@ -1,5 +1,5 @@ #!/bin/bash -# Copyright 2017 Google Inc. +# Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/.kokoro/trampoline_v2.sh b/.kokoro/trampoline_v2.sh index 4af6cdc26d..59a7cf3a93 100755 --- a/.kokoro/trampoline_v2.sh +++ b/.kokoro/trampoline_v2.sh @@ -1,5 +1,5 @@ #!/usr/bin/env bash -# Copyright 2020 Google LLC +# Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/.trampolinerc b/.trampolinerc index 0eee72ab62..a7dfeb42c6 100644 --- a/.trampolinerc +++ b/.trampolinerc @@ -1,4 +1,4 @@ -# Copyright 2020 Google LLC +# Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,8 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -# Template for .trampolinerc - # Add required env vars here. required_envvars+=( ) From ce342f4904e999441997dd07b6bf19464f3bac97 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Tue, 12 Sep 2023 14:38:42 -0700 Subject: [PATCH 14/33] ran autopep8 --- tests/system/test_retry_streaming.py | 30 ++++++++++++++-------- tests/system/test_retry_streaming_async.py | 21 ++++++++++----- 2 files changed, 34 insertions(+), 17 deletions(-) diff --git a/tests/system/test_retry_streaming.py b/tests/system/test_retry_streaming.py index c22d3dd0f6..29af7ec88f 100644 --- a/tests/system/test_retry_streaming.py +++ b/tests/system/test_retry_streaming.py @@ -31,7 +31,8 @@ def test_streaming_retry_success(sequence): """ Test a stream with a sigle success response """ - retry = retries.Retry(predicate=retries.if_exception_type(), is_stream=True) + retry = retries.Retry( + predicate=retries.if_exception_type(), is_stream=True) content = ["hello", "world"] seq = sequence.create_streaming_sequence( streaming_sequence={ @@ -56,7 +57,8 @@ def test_streaming_non_retryable_error(sequence): """ Test a retryable stream failing with non-retryable error """ - retry = retries.Retry(predicate=retries.if_exception_type(), is_stream=True) + retry = retries.Retry( + predicate=retries.if_exception_type(), is_stream=True) content = ["hello", "world"] error = Status( code=_code_from_exc(core_exceptions.ServiceUnavailable), @@ -86,7 +88,8 @@ def test_streaming_transient_retryable(sequence): Retryable errors should not be presented to the end user. """ retry = retries.Retry( - predicate=retries.if_exception_type(core_exceptions.ServiceUnavailable), + predicate=retries.if_exception_type( + core_exceptions.ServiceUnavailable), initial=0, maximum=0, timeout=1, @@ -127,7 +130,8 @@ def test_streaming_transient_retryable_partial_data(sequence): Wrapped stream should contain data from all attempts """ retry = retries.Retry( - predicate=retries.if_exception_type(core_exceptions.ServiceUnavailable), + predicate=retries.if_exception_type( + core_exceptions.ServiceUnavailable), initial=0, maximum=0, is_stream=True, @@ -150,7 +154,8 @@ def test_streaming_transient_retryable_partial_data(sequence): ) it = sequence.attempt_streaming_sequence(name=seq.name, retry=retry) results = [pb.content for pb in it] - assert results == ["hello"] * len(transient_error_list) + ["hello", "world"] + assert results == ["hello"] * \ + len(transient_error_list) + ["hello", "world"] # verify streaming report report = sequence.get_streaming_sequence_report( name=f"{seq.name}/streamingSequenceReport" @@ -168,7 +173,8 @@ def test_streaming_retryable_eventual_timeout(sequence): Should raise a retry error. """ retry = retries.Retry( - predicate=retries.if_exception_type(core_exceptions.ServiceUnavailable), + predicate=retries.if_exception_type( + core_exceptions.ServiceUnavailable), initial=0, maximum=0, timeout=0.35, @@ -180,7 +186,8 @@ def test_streaming_retryable_eventual_timeout(sequence): message="transient error", ) transient_error_list = [ - {"status": error, "response_index": 1, "delay": timedelta(seconds=0.15)} + {"status": error, "response_index": 1, + "delay": timedelta(seconds=0.15)} ] * 10 responses = transient_error_list + [ {"status": Status(code=0), "response_index": len(content)} @@ -231,7 +238,8 @@ def on_error(exc): core_exceptions.DeadlineExceeded, core_exceptions.NotFound, ] - responses = [{"status": Status(code=_code_from_exc(exc))} for exc in errors] + responses = [{"status": Status(code=_code_from_exc(exc))} + for exc in errors] seq = sequence.create_streaming_sequence( streaming_sequence={ "name": __name__, @@ -268,7 +276,8 @@ def test_streaming_retry_sleep_generator( should be able to pass in sleep generator to control backoff """ retry = retries.Retry( - predicate=retries.if_exception_type(core_exceptions.ServiceUnavailable), + predicate=retries.if_exception_type( + core_exceptions.ServiceUnavailable), initial=initial, maximum=maximum, multiplier=multiplier, @@ -294,7 +303,8 @@ def test_streaming_retry_sleep_generator( # make sleep generator deterministic mock_uniform.side_effect = lambda a, b: b with mock.patch("time.sleep") as mock_sleep: - it = sequence.attempt_streaming_sequence(name=seq.name, retry=retry) + it = sequence.attempt_streaming_sequence( + name=seq.name, retry=retry) [pb.content for pb in it] assert mock_sleep.call_count == len(expected) # ensure that sleep times match expected diff --git a/tests/system/test_retry_streaming_async.py b/tests/system/test_retry_streaming_async.py index 1a91e04bae..155f9de9df 100644 --- a/tests/system/test_retry_streaming_async.py +++ b/tests/system/test_retry_streaming_async.py @@ -90,7 +90,8 @@ async def test_async_streaming_transient_retryable(async_sequence): Retryable errors should not be presented to the end user. """ retry = retries_async.AsyncRetry( - predicate=retries.if_exception_type(core_exceptions.ServiceUnavailable), + predicate=retries.if_exception_type( + core_exceptions.ServiceUnavailable), initial=0, maximum=0, timeout=1, @@ -132,7 +133,8 @@ async def test_async_streaming_transient_retryable_partial_data(async_sequence): Wrapped stream should contain data from all attempts """ retry = retries_async.AsyncRetry( - predicate=retries.if_exception_type(core_exceptions.ServiceUnavailable), + predicate=retries.if_exception_type( + core_exceptions.ServiceUnavailable), initial=0, maximum=0, is_stream=True, @@ -155,7 +157,8 @@ async def test_async_streaming_transient_retryable_partial_data(async_sequence): ) it = await async_sequence.attempt_streaming_sequence(name=seq.name, retry=retry) results = [pb.content async for pb in it] - assert results == ["hello"] * len(transient_error_list) + ["hello", "world"] + assert results == ["hello"] * \ + len(transient_error_list) + ["hello", "world"] # verify streaming report report = await async_sequence.get_streaming_sequence_report( name=f"{seq.name}/streamingSequenceReport" @@ -174,7 +177,8 @@ async def test_async_streaming_retryable_eventual_timeout(async_sequence): Should raise a retry error. """ retry = retries_async.AsyncRetry( - predicate=retries.if_exception_type(core_exceptions.ServiceUnavailable), + predicate=retries.if_exception_type( + core_exceptions.ServiceUnavailable), initial=0, maximum=0, timeout=0.35, @@ -186,7 +190,8 @@ async def test_async_streaming_retryable_eventual_timeout(async_sequence): message="transient error", ) transient_error_list = [ - {"status": error, "response_index": 1, "delay": timedelta(seconds=0.15)} + {"status": error, "response_index": 1, + "delay": timedelta(seconds=0.15)} ] * 10 responses = transient_error_list + [ {"status": Status(code=0), "response_index": len(content)} @@ -238,7 +243,8 @@ def on_error(exc): core_exceptions.DeadlineExceeded, core_exceptions.NotFound, ] - responses = [{"status": Status(code=_code_from_exc(exc))} for exc in errors] + responses = [{"status": Status(code=_code_from_exc(exc))} + for exc in errors] seq = await async_sequence.create_streaming_sequence( streaming_sequence={ "name": __name__, @@ -276,7 +282,8 @@ async def test_async_streaming_retry_sleep_generator( should be able to pass in sleep generator to control backoff """ retry = retries_async.AsyncRetry( - predicate=retries.if_exception_type(core_exceptions.ServiceUnavailable), + predicate=retries.if_exception_type( + core_exceptions.ServiceUnavailable), initial=initial, maximum=maximum, multiplier=multiplier, From 0661c3c82b92c7888e480be11942cfe79c80ba40 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Tue, 12 Sep 2023 14:54:59 -0700 Subject: [PATCH 15/33] ran autopep on conftest --- tests/system/conftest.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/system/conftest.py b/tests/system/conftest.py index b1b797c0ee..4934f72b68 100644 --- a/tests/system/conftest.py +++ b/tests/system/conftest.py @@ -72,7 +72,6 @@ def async_sequence(use_mtls, event_loop): ) - dir = os.path.dirname(__file__) with open(os.path.join(dir, "../cert/mtls.crt"), "rb") as fh: cert = fh.read() @@ -145,10 +144,12 @@ def use_mtls(request): def echo(use_mtls, request): return construct_client(EchoClient, use_mtls, transport_name=request.param) + @pytest.fixture(params=["grpc", "rest"]) def sequence(use_mtls, request): return construct_client(SequenceServiceClient, use_mtls, transport_name=request.param) + @pytest.fixture(params=["grpc", "rest"]) def identity(use_mtls, request): return construct_client(IdentityClient, use_mtls, transport_name=request.param) From c156cfd62761ef91aef307a3ec23c61535acd9f5 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Tue, 12 Sep 2023 14:59:55 -0700 Subject: [PATCH 16/33] import mock from unittest --- tests/system/test_retry_streaming.py | 2 +- tests/system/test_retry_streaming_async.py | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/system/test_retry_streaming.py b/tests/system/test_retry_streaming.py index 29af7ec88f..f637180fcc 100644 --- a/tests/system/test_retry_streaming.py +++ b/tests/system/test_retry_streaming.py @@ -13,7 +13,7 @@ # limitations under the License. import pytest -import mock +from unittest import mock from google.rpc.status_pb2 import Status from datetime import timedelta from google.api_core import retry as retries diff --git a/tests/system/test_retry_streaming_async.py b/tests/system/test_retry_streaming_async.py index 155f9de9df..865f0a3294 100644 --- a/tests/system/test_retry_streaming_async.py +++ b/tests/system/test_retry_streaming_async.py @@ -12,9 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -import asyncio import pytest -import mock +from unittest import mock from google.rpc.status_pb2 import Status from datetime import timedelta from google.api_core import retry as retries From 114f447914b7046b151e3f544345de10e113b588 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Tue, 12 Sep 2023 15:04:44 -0700 Subject: [PATCH 17/33] updated showcase version in github actions --- .github/workflows/tests.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 1d464c51c9..34228f775c 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -14,7 +14,7 @@ concurrency: cancel-in-progress: true env: - SHOWCASE_VERSION: 0.25.0 + SHOWCASE_VERSION: 0.28.4 PROTOC_VERSION: 3.20.2 jobs: From c8a520f9eaacb5fee4023853d43ff8e0ac6596ef Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Fri, 6 Oct 2023 16:10:28 -0700 Subject: [PATCH 18/33] changing stream test --- tests/system/test_retry_streaming.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/tests/system/test_retry_streaming.py b/tests/system/test_retry_streaming.py index f637180fcc..82fd974010 100644 --- a/tests/system/test_retry_streaming.py +++ b/tests/system/test_retry_streaming.py @@ -129,6 +129,7 @@ def test_streaming_transient_retryable_partial_data(sequence): Server stream yields some data before failing with a retryable error a number of times before success. Wrapped stream should contain data from all attempts """ + from google.protobuf.duration_pb2 import Duration retry = retries.Retry( predicate=retries.if_exception_type( core_exceptions.ServiceUnavailable), @@ -136,26 +137,26 @@ def test_streaming_transient_retryable_partial_data(sequence): maximum=0, is_stream=True, ) - content = ["hello", "world"] + content = ["hello", ",", "world"] error = Status( code=_code_from_exc(core_exceptions.ServiceUnavailable), message="transient error", ) - transient_error_list = [{"status": error, "response_index": 1}] * 3 + transient_error_list = [{"status": error, "response_index": 3, "delay":Duration(seconds=30)}] * 3 + responses = transient_error_list + [ {"status": Status(code=0), "response_index": len(content)} ] seq = sequence.create_streaming_sequence( - streaming_sequence={ - "name": __name__, - "content": " ".join(content), - "responses": responses, + streaming_sequence={ + "name": __name__, + "content": " ".join(content), + "responses": responses, } ) it = sequence.attempt_streaming_sequence(name=seq.name, retry=retry) results = [pb.content for pb in it] - assert results == ["hello"] * \ - len(transient_error_list) + ["hello", "world"] + assert results == ["hello", "hello", "hello", "hello", "world"] # verify streaming report report = sequence.get_streaming_sequence_report( name=f"{seq.name}/streamingSequenceReport" From ba9c97199255d2d9c0e5868f5dc9ae6514399124 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Fri, 4 Apr 2025 15:27:46 -0700 Subject: [PATCH 19/33] ran blacken --- tests/system/conftest.py | 2 +- tests/system/test_retry_streaming.py | 40 +++++++++------------- tests/system/test_retry_streaming_async.py | 25 ++++++-------- 3 files changed, 28 insertions(+), 39 deletions(-) diff --git a/tests/system/conftest.py b/tests/system/conftest.py index 4a1c802c72..ff2a63b36c 100644 --- a/tests/system/conftest.py +++ b/tests/system/conftest.py @@ -117,7 +117,7 @@ def async_sequence(use_mtls, event_loop): SequenceServiceAsyncClient, use_mtls, transport_name="grpc_asyncio", - channel_creator=aio.insecure_channel + channel_creator=aio.insecure_channel, ) diff --git a/tests/system/test_retry_streaming.py b/tests/system/test_retry_streaming.py index 82fd974010..bea6fccb91 100644 --- a/tests/system/test_retry_streaming.py +++ b/tests/system/test_retry_streaming.py @@ -31,8 +31,7 @@ def test_streaming_retry_success(sequence): """ Test a stream with a sigle success response """ - retry = retries.Retry( - predicate=retries.if_exception_type(), is_stream=True) + retry = retries.Retry(predicate=retries.if_exception_type(), is_stream=True) content = ["hello", "world"] seq = sequence.create_streaming_sequence( streaming_sequence={ @@ -57,8 +56,7 @@ def test_streaming_non_retryable_error(sequence): """ Test a retryable stream failing with non-retryable error """ - retry = retries.Retry( - predicate=retries.if_exception_type(), is_stream=True) + retry = retries.Retry(predicate=retries.if_exception_type(), is_stream=True) content = ["hello", "world"] error = Status( code=_code_from_exc(core_exceptions.ServiceUnavailable), @@ -88,8 +86,7 @@ def test_streaming_transient_retryable(sequence): Retryable errors should not be presented to the end user. """ retry = retries.Retry( - predicate=retries.if_exception_type( - core_exceptions.ServiceUnavailable), + predicate=retries.if_exception_type(core_exceptions.ServiceUnavailable), initial=0, maximum=0, timeout=1, @@ -130,9 +127,9 @@ def test_streaming_transient_retryable_partial_data(sequence): Wrapped stream should contain data from all attempts """ from google.protobuf.duration_pb2 import Duration + retry = retries.Retry( - predicate=retries.if_exception_type( - core_exceptions.ServiceUnavailable), + predicate=retries.if_exception_type(core_exceptions.ServiceUnavailable), initial=0, maximum=0, is_stream=True, @@ -142,16 +139,18 @@ def test_streaming_transient_retryable_partial_data(sequence): code=_code_from_exc(core_exceptions.ServiceUnavailable), message="transient error", ) - transient_error_list = [{"status": error, "response_index": 3, "delay":Duration(seconds=30)}] * 3 + transient_error_list = [ + {"status": error, "response_index": 3, "delay": Duration(seconds=30)} + ] * 3 responses = transient_error_list + [ {"status": Status(code=0), "response_index": len(content)} ] seq = sequence.create_streaming_sequence( - streaming_sequence={ - "name": __name__, - "content": " ".join(content), - "responses": responses, + streaming_sequence={ + "name": __name__, + "content": " ".join(content), + "responses": responses, } ) it = sequence.attempt_streaming_sequence(name=seq.name, retry=retry) @@ -174,8 +173,7 @@ def test_streaming_retryable_eventual_timeout(sequence): Should raise a retry error. """ retry = retries.Retry( - predicate=retries.if_exception_type( - core_exceptions.ServiceUnavailable), + predicate=retries.if_exception_type(core_exceptions.ServiceUnavailable), initial=0, maximum=0, timeout=0.35, @@ -187,8 +185,7 @@ def test_streaming_retryable_eventual_timeout(sequence): message="transient error", ) transient_error_list = [ - {"status": error, "response_index": 1, - "delay": timedelta(seconds=0.15)} + {"status": error, "response_index": 1, "delay": timedelta(seconds=0.15)} ] * 10 responses = transient_error_list + [ {"status": Status(code=0), "response_index": len(content)} @@ -239,8 +236,7 @@ def on_error(exc): core_exceptions.DeadlineExceeded, core_exceptions.NotFound, ] - responses = [{"status": Status(code=_code_from_exc(exc))} - for exc in errors] + responses = [{"status": Status(code=_code_from_exc(exc))} for exc in errors] seq = sequence.create_streaming_sequence( streaming_sequence={ "name": __name__, @@ -277,8 +273,7 @@ def test_streaming_retry_sleep_generator( should be able to pass in sleep generator to control backoff """ retry = retries.Retry( - predicate=retries.if_exception_type( - core_exceptions.ServiceUnavailable), + predicate=retries.if_exception_type(core_exceptions.ServiceUnavailable), initial=initial, maximum=maximum, multiplier=multiplier, @@ -304,8 +299,7 @@ def test_streaming_retry_sleep_generator( # make sleep generator deterministic mock_uniform.side_effect = lambda a, b: b with mock.patch("time.sleep") as mock_sleep: - it = sequence.attempt_streaming_sequence( - name=seq.name, retry=retry) + it = sequence.attempt_streaming_sequence(name=seq.name, retry=retry) [pb.content for pb in it] assert mock_sleep.call_count == len(expected) # ensure that sleep times match expected diff --git a/tests/system/test_retry_streaming_async.py b/tests/system/test_retry_streaming_async.py index 865f0a3294..7bc539ce13 100644 --- a/tests/system/test_retry_streaming_async.py +++ b/tests/system/test_retry_streaming_async.py @@ -89,8 +89,7 @@ async def test_async_streaming_transient_retryable(async_sequence): Retryable errors should not be presented to the end user. """ retry = retries_async.AsyncRetry( - predicate=retries.if_exception_type( - core_exceptions.ServiceUnavailable), + predicate=retries.if_exception_type(core_exceptions.ServiceUnavailable), initial=0, maximum=0, timeout=1, @@ -132,8 +131,7 @@ async def test_async_streaming_transient_retryable_partial_data(async_sequence): Wrapped stream should contain data from all attempts """ retry = retries_async.AsyncRetry( - predicate=retries.if_exception_type( - core_exceptions.ServiceUnavailable), + predicate=retries.if_exception_type(core_exceptions.ServiceUnavailable), initial=0, maximum=0, is_stream=True, @@ -156,8 +154,7 @@ async def test_async_streaming_transient_retryable_partial_data(async_sequence): ) it = await async_sequence.attempt_streaming_sequence(name=seq.name, retry=retry) results = [pb.content async for pb in it] - assert results == ["hello"] * \ - len(transient_error_list) + ["hello", "world"] + assert results == ["hello"] * len(transient_error_list) + ["hello", "world"] # verify streaming report report = await async_sequence.get_streaming_sequence_report( name=f"{seq.name}/streamingSequenceReport" @@ -176,8 +173,7 @@ async def test_async_streaming_retryable_eventual_timeout(async_sequence): Should raise a retry error. """ retry = retries_async.AsyncRetry( - predicate=retries.if_exception_type( - core_exceptions.ServiceUnavailable), + predicate=retries.if_exception_type(core_exceptions.ServiceUnavailable), initial=0, maximum=0, timeout=0.35, @@ -189,8 +185,7 @@ async def test_async_streaming_retryable_eventual_timeout(async_sequence): message="transient error", ) transient_error_list = [ - {"status": error, "response_index": 1, - "delay": timedelta(seconds=0.15)} + {"status": error, "response_index": 1, "delay": timedelta(seconds=0.15)} ] * 10 responses = transient_error_list + [ {"status": Status(code=0), "response_index": len(content)} @@ -242,8 +237,7 @@ def on_error(exc): core_exceptions.DeadlineExceeded, core_exceptions.NotFound, ] - responses = [{"status": Status(code=_code_from_exc(exc))} - for exc in errors] + responses = [{"status": Status(code=_code_from_exc(exc))} for exc in errors] seq = await async_sequence.create_streaming_sequence( streaming_sequence={ "name": __name__, @@ -281,8 +275,7 @@ async def test_async_streaming_retry_sleep_generator( should be able to pass in sleep generator to control backoff """ retry = retries_async.AsyncRetry( - predicate=retries.if_exception_type( - core_exceptions.ServiceUnavailable), + predicate=retries.if_exception_type(core_exceptions.ServiceUnavailable), initial=initial, maximum=maximum, multiplier=multiplier, @@ -308,7 +301,9 @@ async def test_async_streaming_retry_sleep_generator( # make sleep generator deterministic mock_uniform.side_effect = lambda a, b: b with mock.patch("asyncio.sleep") as mock_sleep: - it = await async_sequence.attempt_streaming_sequence(name=seq.name, retry=retry) + it = await async_sequence.attempt_streaming_sequence( + name=seq.name, retry=retry + ) [pb.content async for pb in it] assert mock_sleep.call_count == len(expected) # ensure that sleep times match expected From cbb8960f6954e970646f5dd887deeda10dc01128 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Fri, 4 Apr 2025 15:30:57 -0700 Subject: [PATCH 20/33] use StreamingRetry class --- tests/system/test_retry_streaming.py | 19 +++++++----------- tests/system/test_retry_streaming_async.py | 23 +++++++++------------- 2 files changed, 16 insertions(+), 26 deletions(-) diff --git a/tests/system/test_retry_streaming.py b/tests/system/test_retry_streaming.py index bea6fccb91..654965fcba 100644 --- a/tests/system/test_retry_streaming.py +++ b/tests/system/test_retry_streaming.py @@ -31,7 +31,7 @@ def test_streaming_retry_success(sequence): """ Test a stream with a sigle success response """ - retry = retries.Retry(predicate=retries.if_exception_type(), is_stream=True) + retry = retries.StreamingRetry(predicate=retries.if_exception_type()) content = ["hello", "world"] seq = sequence.create_streaming_sequence( streaming_sequence={ @@ -56,7 +56,7 @@ def test_streaming_non_retryable_error(sequence): """ Test a retryable stream failing with non-retryable error """ - retry = retries.Retry(predicate=retries.if_exception_type(), is_stream=True) + retry = retries.StreamingRetry(predicate=retries.if_exception_type()) content = ["hello", "world"] error = Status( code=_code_from_exc(core_exceptions.ServiceUnavailable), @@ -85,12 +85,11 @@ def test_streaming_transient_retryable(sequence): Server returns a retryable error a number of times before success. Retryable errors should not be presented to the end user. """ - retry = retries.Retry( + retry = retries.StreamingRetry( predicate=retries.if_exception_type(core_exceptions.ServiceUnavailable), initial=0, maximum=0, timeout=1, - is_stream=True, ) content = ["hello", "world"] error = Status( @@ -128,11 +127,10 @@ def test_streaming_transient_retryable_partial_data(sequence): """ from google.protobuf.duration_pb2 import Duration - retry = retries.Retry( + retry = retries.StreamingRetry( predicate=retries.if_exception_type(core_exceptions.ServiceUnavailable), initial=0, maximum=0, - is_stream=True, ) content = ["hello", ",", "world"] error = Status( @@ -172,12 +170,11 @@ def test_streaming_retryable_eventual_timeout(sequence): Server returns a retryable error a number of times before reaching timeout. Should raise a retry error. """ - retry = retries.Retry( + retry = retries.StreamingRetry( predicate=retries.if_exception_type(core_exceptions.ServiceUnavailable), initial=0, maximum=0, timeout=0.35, - is_stream=True, ) content = ["hello", "world"] error = Status( @@ -221,14 +218,13 @@ def test_streaming_retry_on_error(sequence): def on_error(exc): encountered_excs.append(exc) - retry = retries.Retry( + retry = retries.StreamingRetry( predicate=retries.if_exception_type( core_exceptions.ServiceUnavailable, core_exceptions.GatewayTimeout ), initial=0, maximum=0, on_error=on_error, - is_stream=True, ) content = ["hello", "world"] errors = [ @@ -272,12 +268,11 @@ def test_streaming_retry_sleep_generator( """ should be able to pass in sleep generator to control backoff """ - retry = retries.Retry( + retry = retries.StreamingRetry( predicate=retries.if_exception_type(core_exceptions.ServiceUnavailable), initial=initial, maximum=maximum, multiplier=multiplier, - is_stream=True, ) content = ["hello", "world"] error = Status( diff --git a/tests/system/test_retry_streaming_async.py b/tests/system/test_retry_streaming_async.py index 7bc539ce13..ed3060e0d5 100644 --- a/tests/system/test_retry_streaming_async.py +++ b/tests/system/test_retry_streaming_async.py @@ -28,8 +28,8 @@ async def test_async_streaming_retry_success(async_sequence): """ Test a stream with a sigle success response """ - retry = retries_async.AsyncRetry( - predicate=retries.if_exception_type(), is_stream=True + retry = retries_async.AsyncStreamingRetry( + predicate=retries.if_exception_type() ) content = ["hello", "world"] seq = await async_sequence.create_streaming_sequence( @@ -56,8 +56,8 @@ async def test_async_streaming_non_retryable_error(async_sequence): """ Test a retryable stream failing with non-retryable error """ - retry = retries_async.AsyncRetry( - predicate=retries.if_exception_type(), is_stream=True + retry = retries_async.AsyncStreamingRetry( + predicate=retries.if_exception_type() ) content = ["hello", "world"] error = Status( @@ -88,12 +88,11 @@ async def test_async_streaming_transient_retryable(async_sequence): Server returns a retryable error a number of times before success. Retryable errors should not be presented to the end user. """ - retry = retries_async.AsyncRetry( + retry = retries_async.AsyncStreamingRetry( predicate=retries.if_exception_type(core_exceptions.ServiceUnavailable), initial=0, maximum=0, timeout=1, - is_stream=True, ) content = ["hello", "world"] error = Status( @@ -130,11 +129,10 @@ async def test_async_streaming_transient_retryable_partial_data(async_sequence): Server stream yields some data before failing with a retryable error a number of times before success. Wrapped stream should contain data from all attempts """ - retry = retries_async.AsyncRetry( + retry = retries_async.AsyncStreamingRetry( predicate=retries.if_exception_type(core_exceptions.ServiceUnavailable), initial=0, maximum=0, - is_stream=True, ) content = ["hello", "world"] error = Status( @@ -172,12 +170,11 @@ async def test_async_streaming_retryable_eventual_timeout(async_sequence): Server returns a retryable error a number of times before reaching timeout. Should raise a retry error. """ - retry = retries_async.AsyncRetry( + retry = retries_async.AsyncStreamingRetry( predicate=retries.if_exception_type(core_exceptions.ServiceUnavailable), initial=0, maximum=0, timeout=0.35, - is_stream=True, ) content = ["hello", "world"] error = Status( @@ -222,14 +219,13 @@ async def test_async_streaming_retry_on_error(async_sequence): def on_error(exc): encountered_excs.append(exc) - retry = retries_async.AsyncRetry( + retry = retries_async.AsyncStreamingRetry( predicate=retries.if_exception_type( core_exceptions.ServiceUnavailable, core_exceptions.GatewayTimeout ), initial=0, maximum=0, on_error=on_error, - is_stream=True, ) content = ["hello", "world"] errors = [ @@ -274,12 +270,11 @@ async def test_async_streaming_retry_sleep_generator( """ should be able to pass in sleep generator to control backoff """ - retry = retries_async.AsyncRetry( + retry = retries_async.AsyncStreamingRetry( predicate=retries.if_exception_type(core_exceptions.ServiceUnavailable), initial=initial, maximum=maximum, multiplier=multiplier, - is_stream=True, ) content = ["hello", "world"] error = Status( From 31031fe5c186f0ee6b6c9f8436ed658b4938e01f Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Fri, 4 Apr 2025 15:42:53 -0700 Subject: [PATCH 21/33] added back sequence fixture --- tests/system/conftest.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tests/system/conftest.py b/tests/system/conftest.py index ff2a63b36c..c477905cc7 100644 --- a/tests/system/conftest.py +++ b/tests/system/conftest.py @@ -258,6 +258,15 @@ def identity(use_mtls, request): return construct_client(IdentityClient, use_mtls, transport_name=request.param) +@pytest.fixture(params=["grpc", "rest"]) +def sequence(use_mtls, request): + return construct_client( + SequenceServiceClient, + use_mtls, + transport_name=request.param + ) + + @pytest.fixture(params=["grpc", "rest"]) def messaging(use_mtls, request): return construct_client(MessagingClient, use_mtls, transport_name=request.param) From be1c47aef707c2de9187f49101d0ecf1670bb610 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Fri, 4 Apr 2025 15:43:08 -0700 Subject: [PATCH 22/33] fixed import --- tests/system/test_retry_streaming_async.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system/test_retry_streaming_async.py b/tests/system/test_retry_streaming_async.py index ed3060e0d5..3d507f869e 100644 --- a/tests/system/test_retry_streaming_async.py +++ b/tests/system/test_retry_streaming_async.py @@ -17,7 +17,7 @@ from google.rpc.status_pb2 import Status from datetime import timedelta from google.api_core import retry as retries -from google.api_core import retry_async as retries_async +from google.api_core import retry_streaming_async as retries_async from google.api_core import exceptions as core_exceptions from test_retry_streaming import _code_from_exc From c95aded75bcc7dedab383959bc18b8ee5ea340ab Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Fri, 4 Apr 2025 15:45:08 -0700 Subject: [PATCH 23/33] ran blacken --- tests/system/conftest.py | 4 +--- tests/system/test_retry_streaming_async.py | 8 ++------ 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/tests/system/conftest.py b/tests/system/conftest.py index c477905cc7..08791cc59c 100644 --- a/tests/system/conftest.py +++ b/tests/system/conftest.py @@ -261,9 +261,7 @@ def identity(use_mtls, request): @pytest.fixture(params=["grpc", "rest"]) def sequence(use_mtls, request): return construct_client( - SequenceServiceClient, - use_mtls, - transport_name=request.param + SequenceServiceClient, use_mtls, transport_name=request.param ) diff --git a/tests/system/test_retry_streaming_async.py b/tests/system/test_retry_streaming_async.py index 3d507f869e..f35d84b2c2 100644 --- a/tests/system/test_retry_streaming_async.py +++ b/tests/system/test_retry_streaming_async.py @@ -28,9 +28,7 @@ async def test_async_streaming_retry_success(async_sequence): """ Test a stream with a sigle success response """ - retry = retries_async.AsyncStreamingRetry( - predicate=retries.if_exception_type() - ) + retry = retries_async.AsyncStreamingRetry(predicate=retries.if_exception_type()) content = ["hello", "world"] seq = await async_sequence.create_streaming_sequence( streaming_sequence={ @@ -56,9 +54,7 @@ async def test_async_streaming_non_retryable_error(async_sequence): """ Test a retryable stream failing with non-retryable error """ - retry = retries_async.AsyncStreamingRetry( - predicate=retries.if_exception_type() - ) + retry = retries_async.AsyncStreamingRetry(predicate=retries.if_exception_type()) content = ["hello", "world"] error = Status( code=_code_from_exc(core_exceptions.ServiceUnavailable), From 3d98926ffdf073a7e25b1a642554f639e6cce3c2 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Fri, 4 Apr 2025 15:46:44 -0700 Subject: [PATCH 24/33] fixed import --- tests/system/test_retry_streaming_async.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/tests/system/test_retry_streaming_async.py b/tests/system/test_retry_streaming_async.py index f35d84b2c2..da4939787b 100644 --- a/tests/system/test_retry_streaming_async.py +++ b/tests/system/test_retry_streaming_async.py @@ -17,7 +17,6 @@ from google.rpc.status_pb2 import Status from datetime import timedelta from google.api_core import retry as retries -from google.api_core import retry_streaming_async as retries_async from google.api_core import exceptions as core_exceptions from test_retry_streaming import _code_from_exc @@ -28,7 +27,7 @@ async def test_async_streaming_retry_success(async_sequence): """ Test a stream with a sigle success response """ - retry = retries_async.AsyncStreamingRetry(predicate=retries.if_exception_type()) + retry = retries.AsyncStreamingRetry(predicate=retries.if_exception_type()) content = ["hello", "world"] seq = await async_sequence.create_streaming_sequence( streaming_sequence={ @@ -54,7 +53,7 @@ async def test_async_streaming_non_retryable_error(async_sequence): """ Test a retryable stream failing with non-retryable error """ - retry = retries_async.AsyncStreamingRetry(predicate=retries.if_exception_type()) + retry = retries.AsyncStreamingRetry(predicate=retries.if_exception_type()) content = ["hello", "world"] error = Status( code=_code_from_exc(core_exceptions.ServiceUnavailable), @@ -84,7 +83,7 @@ async def test_async_streaming_transient_retryable(async_sequence): Server returns a retryable error a number of times before success. Retryable errors should not be presented to the end user. """ - retry = retries_async.AsyncStreamingRetry( + retry = retries.AsyncStreamingRetry( predicate=retries.if_exception_type(core_exceptions.ServiceUnavailable), initial=0, maximum=0, @@ -125,7 +124,7 @@ async def test_async_streaming_transient_retryable_partial_data(async_sequence): Server stream yields some data before failing with a retryable error a number of times before success. Wrapped stream should contain data from all attempts """ - retry = retries_async.AsyncStreamingRetry( + retry = retries.AsyncStreamingRetry( predicate=retries.if_exception_type(core_exceptions.ServiceUnavailable), initial=0, maximum=0, @@ -166,7 +165,7 @@ async def test_async_streaming_retryable_eventual_timeout(async_sequence): Server returns a retryable error a number of times before reaching timeout. Should raise a retry error. """ - retry = retries_async.AsyncStreamingRetry( + retry = retries.AsyncStreamingRetry( predicate=retries.if_exception_type(core_exceptions.ServiceUnavailable), initial=0, maximum=0, @@ -215,7 +214,7 @@ async def test_async_streaming_retry_on_error(async_sequence): def on_error(exc): encountered_excs.append(exc) - retry = retries_async.AsyncStreamingRetry( + retry = retries.AsyncStreamingRetry( predicate=retries.if_exception_type( core_exceptions.ServiceUnavailable, core_exceptions.GatewayTimeout ), @@ -266,7 +265,7 @@ async def test_async_streaming_retry_sleep_generator( """ should be able to pass in sleep generator to control backoff """ - retry = retries_async.AsyncStreamingRetry( + retry = retries.AsyncStreamingRetry( predicate=retries.if_exception_type(core_exceptions.ServiceUnavailable), initial=initial, maximum=maximum, From a29c9beec09591da6a552e38da813c72b3c665ef Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Fri, 4 Apr 2025 16:27:10 -0700 Subject: [PATCH 25/33] added skipif for api_core version --- tests/system/test_retry_streaming.py | 4 ++++ tests/system/test_retry_streaming_async.py | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/tests/system/test_retry_streaming.py b/tests/system/test_retry_streaming.py index 654965fcba..98f6eda717 100644 --- a/tests/system/test_retry_streaming.py +++ b/tests/system/test_retry_streaming.py @@ -18,6 +18,10 @@ from datetime import timedelta from google.api_core import retry as retries from google.api_core import exceptions as core_exceptions +from google.api_core.version import __version__ as api_core_version + +if [int(n) for n in api_core_version.split(".")] < [2, 16, 0]: + pytest.skip("streaming retries requires api_core v2.16.0+", allow_module_level=True) def _code_from_exc(exc): diff --git a/tests/system/test_retry_streaming_async.py b/tests/system/test_retry_streaming_async.py index da4939787b..2f53fa1fed 100644 --- a/tests/system/test_retry_streaming_async.py +++ b/tests/system/test_retry_streaming_async.py @@ -18,9 +18,13 @@ from datetime import timedelta from google.api_core import retry as retries from google.api_core import exceptions as core_exceptions +from google.api_core.version import __version__ as api_core_version from test_retry_streaming import _code_from_exc +if [int(n) for n in api_core_version.split(".")] < [2, 16, 0]: + pytest.skip("streaming retries requires api_core v2.16.0+", allow_module_level=True) + @pytest.mark.asyncio async def test_async_streaming_retry_success(async_sequence): From 8ee624da1ffec57186d78258cb68a791d98e9899 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Fri, 4 Apr 2025 16:39:09 -0700 Subject: [PATCH 26/33] changed test sequencing --- tests/system/test_retry_streaming.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/system/test_retry_streaming.py b/tests/system/test_retry_streaming.py index 98f6eda717..43ba3a5a12 100644 --- a/tests/system/test_retry_streaming.py +++ b/tests/system/test_retry_streaming.py @@ -136,13 +136,13 @@ def test_streaming_transient_retryable_partial_data(sequence): initial=0, maximum=0, ) - content = ["hello", ",", "world"] + content = ["hello", "world"] error = Status( code=_code_from_exc(core_exceptions.ServiceUnavailable), message="transient error", ) transient_error_list = [ - {"status": error, "response_index": 3, "delay": Duration(seconds=30)} + {"status": error, "response_index": 1, "delay": Duration(seconds=30)} ] * 3 responses = transient_error_list + [ From 657ba04063c41e8b8759692b8f6f47c030417626 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Fri, 4 Apr 2025 16:56:05 -0700 Subject: [PATCH 27/33] support rest_asyncio --- tests/system/conftest.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/tests/system/conftest.py b/tests/system/conftest.py index 08791cc59c..bc794f0209 100644 --- a/tests/system/conftest.py +++ b/tests/system/conftest.py @@ -111,13 +111,17 @@ def async_identity(use_mtls, request, event_loop): credentials=async_anonymous_credentials(), ) - @pytest.fixture - def async_sequence(use_mtls, event_loop): + @pytest.fixture(params=["grpc_asyncio", "rest_asyncio"]) + def async_sequence(use_mtls, request, event_loop): + transport = request.param return construct_client( SequenceServiceAsyncClient, use_mtls, - transport_name="grpc_asyncio", - channel_creator=aio.insecure_channel, + transport_name=transport, + channel_creator=( + aio.insecure_channel if request.param == "grpc_asyncio" else None + ), + credentials=async_anonymous_credentials(), ) From db86f1e46c7818c6860472d83e503089bd68377e Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Fri, 4 Apr 2025 17:10:17 -0700 Subject: [PATCH 28/33] skip if transport is missing --- tests/system/conftest.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/tests/system/conftest.py b/tests/system/conftest.py index bc794f0209..287852b9a7 100644 --- a/tests/system/conftest.py +++ b/tests/system/conftest.py @@ -61,6 +61,14 @@ HAS_ASYNC_REST_IDENTITY_TRANSPORT = True except: HAS_ASYNC_REST_IDENTITY_TRANSPORT = False + try: + from google.showcase_v1beta1.services.seuence.transports import ( + AsyncSequenceServiceRestTransport, + ) + + HAS_ASYNC_REST_SEQUENCE_TRANSPORT = True + except: + HAS_ASYNC_REST_SEQUENCE_TRANSPORT = False # TODO: use async auth anon credentials by default once the minimum version of google-auth is upgraded. # See related issue: https://github.com/googleapis/gapic-generator-python/issues/2107. @@ -114,6 +122,8 @@ def async_identity(use_mtls, request, event_loop): @pytest.fixture(params=["grpc_asyncio", "rest_asyncio"]) def async_sequence(use_mtls, request, event_loop): transport = request.param + if transport == "rest_asyncio" and not HAS_ASYNC_REST_SEQUENCE_TRANSPORT: + pytest.skip("Skipping test with async rest.") return construct_client( SequenceServiceAsyncClient, use_mtls, From 3cf58418caefa8c9a09d6ba1ac9974cb9676f135 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Fri, 4 Apr 2025 17:10:47 -0700 Subject: [PATCH 29/33] skip known failing test --- tests/system/test_retry_streaming.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/system/test_retry_streaming.py b/tests/system/test_retry_streaming.py index 43ba3a5a12..2645e44726 100644 --- a/tests/system/test_retry_streaming.py +++ b/tests/system/test_retry_streaming.py @@ -128,9 +128,14 @@ def test_streaming_transient_retryable_partial_data(sequence): """ Server stream yields some data before failing with a retryable error a number of times before success. Wrapped stream should contain data from all attempts + + TODO: implement fix for rest client: https://github.com/googleapis/gapic-showcase/issues/1377 """ from google.protobuf.duration_pb2 import Duration + if sequence.transport == type(sequence).get_transport_class("rest"): + pytest.skip("Skipping due to known streaming issue in rest client") + retry = retries.StreamingRetry( predicate=retries.if_exception_type(core_exceptions.ServiceUnavailable), initial=0, From 4a58868624bbef19a0a8e6e475d069613c888f68 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Fri, 4 Apr 2025 17:18:34 -0700 Subject: [PATCH 30/33] fix comparison --- tests/system/test_retry_streaming.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/system/test_retry_streaming.py b/tests/system/test_retry_streaming.py index 2645e44726..343c534a1d 100644 --- a/tests/system/test_retry_streaming.py +++ b/tests/system/test_retry_streaming.py @@ -129,11 +129,13 @@ def test_streaming_transient_retryable_partial_data(sequence): Server stream yields some data before failing with a retryable error a number of times before success. Wrapped stream should contain data from all attempts - TODO: implement fix for rest client: https://github.com/googleapis/gapic-showcase/issues/1377 + TODO: + Test is currently skipped for rest clients due to known issue: + https://github.com/googleapis/gapic-showcase/issues/1377 """ from google.protobuf.duration_pb2 import Duration - if sequence.transport == type(sequence).get_transport_class("rest"): + if type(sequence.transport) == type(sequence).get_transport_class("rest"): pytest.skip("Skipping due to known streaming issue in rest client") retry = retries.StreamingRetry( From 94924aefc8db3c08b57a396209869883b1a575fb Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Fri, 4 Apr 2025 17:35:31 -0700 Subject: [PATCH 31/33] skip AsyncMock test in 3.7 --- tests/system/test_retry_streaming.py | 2 +- tests/system/test_retry_streaming_async.py | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/system/test_retry_streaming.py b/tests/system/test_retry_streaming.py index 343c534a1d..0876f1c572 100644 --- a/tests/system/test_retry_streaming.py +++ b/tests/system/test_retry_streaming.py @@ -135,7 +135,7 @@ def test_streaming_transient_retryable_partial_data(sequence): """ from google.protobuf.duration_pb2 import Duration - if type(sequence.transport) == type(sequence).get_transport_class("rest"): + if isinstance(sequence.transport, type(sequence).get_transport_class("rest")): pytest.skip("Skipping due to known streaming issue in rest client") retry = retries.StreamingRetry( diff --git a/tests/system/test_retry_streaming_async.py b/tests/system/test_retry_streaming_async.py index 2f53fa1fed..2ac38fa803 100644 --- a/tests/system/test_retry_streaming_async.py +++ b/tests/system/test_retry_streaming_async.py @@ -13,6 +13,7 @@ # limitations under the License. import pytest +import sys from unittest import mock from google.rpc.status_pb2 import Status from datetime import timedelta @@ -263,6 +264,7 @@ def on_error(exc): ], ) @pytest.mark.asyncio +@pytest.mark.skipif(sys.version_info < (3, 8), reason="AsyncMock requires 3.8") async def test_async_streaming_retry_sleep_generator( async_sequence, initial, multiplier, maximum, expected ): @@ -294,7 +296,7 @@ async def test_async_streaming_retry_sleep_generator( with mock.patch("random.uniform") as mock_uniform: # make sleep generator deterministic mock_uniform.side_effect = lambda a, b: b - with mock.patch("asyncio.sleep") as mock_sleep: + with mock.patch("asyncio.sleep", mock.AsyncMock) as mock_sleep: it = await async_sequence.attempt_streaming_sequence( name=seq.name, retry=retry ) From 2a1bb6b603cf389f33aed2249914b82e6289989d Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Fri, 4 Apr 2025 17:40:35 -0700 Subject: [PATCH 32/33] create mock instance --- tests/system/test_retry_streaming_async.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system/test_retry_streaming_async.py b/tests/system/test_retry_streaming_async.py index 2ac38fa803..39e07d3377 100644 --- a/tests/system/test_retry_streaming_async.py +++ b/tests/system/test_retry_streaming_async.py @@ -296,7 +296,7 @@ async def test_async_streaming_retry_sleep_generator( with mock.patch("random.uniform") as mock_uniform: # make sleep generator deterministic mock_uniform.side_effect = lambda a, b: b - with mock.patch("asyncio.sleep", mock.AsyncMock) as mock_sleep: + with mock.patch("asyncio.sleep", mock.AsyncMock()) as mock_sleep: it = await async_sequence.attempt_streaming_sequence( name=seq.name, retry=retry ) From 9dc47256c87f8e14396c68559d66d0c680e54ebc Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Fri, 4 Apr 2025 17:56:49 -0700 Subject: [PATCH 33/33] updated bug link --- tests/system/test_retry_streaming.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system/test_retry_streaming.py b/tests/system/test_retry_streaming.py index 0876f1c572..c0327861e8 100644 --- a/tests/system/test_retry_streaming.py +++ b/tests/system/test_retry_streaming.py @@ -131,7 +131,7 @@ def test_streaming_transient_retryable_partial_data(sequence): TODO: Test is currently skipped for rest clients due to known issue: - https://github.com/googleapis/gapic-showcase/issues/1377 + https://github.com/googleapis/gapic-generator-python/issues/2375 """ from google.protobuf.duration_pb2 import Duration