Skip to content

chore(tests): add streaming retry showcase tests #1764

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 36 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
fa98f3b
updated showcase version for streaming retry tests
daniel-sanche Sep 11, 2023
c6e28cd
added successful test
daniel-sanche Sep 11, 2023
8398997
added test for non-retried exception
daniel-sanche Sep 11, 2023
193e506
added test with transient errors
daniel-sanche Sep 11, 2023
f0d55da
added test for retryable with partial data
daniel-sanche Sep 11, 2023
d562aeb
added retry test with eventual timeout
daniel-sanche Sep 12, 2023
e25866e
added tests for on_error and backoff
daniel-sanche Sep 12, 2023
5ed69f5
added async streaming tests
daniel-sanche Sep 12, 2023
1acfa90
added retry to success test
daniel-sanche Sep 12, 2023
35ae405
ran black
daniel-sanche Sep 12, 2023
899f5bc
added awaits to wrapped sequences
daniel-sanche Sep 12, 2023
3868385
Merge branch 'main' into streaming-retry-showcase-tests
daniel-sanche Sep 12, 2023
2458b16
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 12, 2023
66f80c2
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 12, 2023
ce342f4
ran autopep8
daniel-sanche Sep 12, 2023
0661c3c
ran autopep on conftest
daniel-sanche Sep 12, 2023
c156cfd
import mock from unittest
daniel-sanche Sep 12, 2023
114f447
updated showcase version in github actions
daniel-sanche Sep 12, 2023
c8a520f
changing stream test
daniel-sanche Oct 6, 2023
90a8c43
Merge branch 'main' into streaming-retry-showcase-tests
daniel-sanche Jun 25, 2024
971e2f7
Merge branch 'main' into streaming-retry-showcase-tests
daniel-sanche Apr 4, 2025
ba9c971
ran blacken
daniel-sanche Apr 4, 2025
cbb8960
use StreamingRetry class
daniel-sanche Apr 4, 2025
31031fe
added back sequence fixture
daniel-sanche Apr 4, 2025
be1c47a
fixed import
daniel-sanche Apr 4, 2025
c95aded
ran blacken
daniel-sanche Apr 4, 2025
3d98926
fixed import
daniel-sanche Apr 4, 2025
a29c9be
added skipif for api_core version
daniel-sanche Apr 4, 2025
8ee624d
changed test sequencing
daniel-sanche Apr 4, 2025
657ba04
support rest_asyncio
daniel-sanche Apr 4, 2025
db86f1e
skip if transport is missing
daniel-sanche Apr 5, 2025
3cf5841
skip known failing test
daniel-sanche Apr 5, 2025
4a58868
fix comparison
daniel-sanche Apr 5, 2025
94924ae
skip AsyncMock test in 3.7
daniel-sanche Apr 5, 2025
2a1bb6b
create mock instance
daniel-sanche Apr 5, 2025
9dc4725
updated bug link
daniel-sanche Apr 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ def showcase_library(
f"google/showcase/v1beta1/echo.proto",
f"google/showcase/v1beta1/identity.proto",
f"google/showcase/v1beta1/messaging.proto",
f"google/showcase/v1beta1/sequence.proto",
)
session.run(
*cmd_tup,
Expand Down
32 changes: 32 additions & 0 deletions tests/system/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import google.auth
from google.auth import credentials as ga_credentials
from google.showcase import EchoClient
from google.showcase import SequenceServiceClient
from google.showcase import IdentityClient
from google.showcase import MessagingClient

Expand All @@ -41,6 +42,7 @@
import asyncio
from google.showcase import EchoAsyncClient
from google.showcase import IdentityAsyncClient
from google.showcase import SequenceServiceAsyncClient

try:
from google.showcase_v1beta1.services.echo.transports import (
Expand All @@ -59,6 +61,14 @@
HAS_ASYNC_REST_IDENTITY_TRANSPORT = True
except:
HAS_ASYNC_REST_IDENTITY_TRANSPORT = False
try:
from google.showcase_v1beta1.services.seuence.transports import (
AsyncSequenceServiceRestTransport,
)

HAS_ASYNC_REST_SEQUENCE_TRANSPORT = True
except:
HAS_ASYNC_REST_SEQUENCE_TRANSPORT = False

# TODO: use async auth anon credentials by default once the minimum version of google-auth is upgraded.
# See related issue: https://github.com/googleapis/gapic-generator-python/issues/2107.
Expand Down Expand Up @@ -109,6 +119,21 @@ def async_identity(use_mtls, request, event_loop):
credentials=async_anonymous_credentials(),
)

@pytest.fixture(params=["grpc_asyncio", "rest_asyncio"])
def async_sequence(use_mtls, request, event_loop):
transport = request.param
if transport == "rest_asyncio" and not HAS_ASYNC_REST_SEQUENCE_TRANSPORT:
pytest.skip("Skipping test with async rest.")
return construct_client(
SequenceServiceAsyncClient,
use_mtls,
transport_name=transport,
channel_creator=(
aio.insecure_channel if request.param == "grpc_asyncio" else None
),
credentials=async_anonymous_credentials(),
)


dir = os.path.dirname(__file__)
with open(os.path.join(dir, "../cert/mtls.crt"), "rb") as fh:
Expand Down Expand Up @@ -247,6 +272,13 @@ def identity(use_mtls, request):
return construct_client(IdentityClient, use_mtls, transport_name=request.param)


@pytest.fixture(params=["grpc", "rest"])
def sequence(use_mtls, request):
return construct_client(
SequenceServiceClient, use_mtls, transport_name=request.param
)


@pytest.fixture(params=["grpc", "rest"])
def messaging(use_mtls, request):
return construct_client(MessagingClient, use_mtls, transport_name=request.param)
Expand Down
314 changes: 314 additions & 0 deletions tests/system/test_retry_streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,314 @@
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest
from unittest import mock
from google.rpc.status_pb2 import Status
from datetime import timedelta
from google.api_core import retry as retries
from google.api_core import exceptions as core_exceptions
from google.api_core.version import __version__ as api_core_version

if [int(n) for n in api_core_version.split(".")] < [2, 16, 0]:
pytest.skip("streaming retries requires api_core v2.16.0+", allow_module_level=True)


def _code_from_exc(exc):
"""
return the grpc code from an exception
"""
return exc.grpc_status_code.value[0]


def test_streaming_retry_success(sequence):
"""
Test a stream with a sigle success response
"""
retry = retries.StreamingRetry(predicate=retries.if_exception_type())
content = ["hello", "world"]
seq = sequence.create_streaming_sequence(
streaming_sequence={
"name": __name__,
"content": " ".join(content),
# single response with entire stream content
"responses": [{"status": Status(code=0), "response_index": len(content)}],
}
)
it = sequence.attempt_streaming_sequence(name=seq.name, retry=retry)
results = [pb.content for pb in it]
assert results == content
# verify streaming report
report = sequence.get_streaming_sequence_report(
name=f"{seq.name}/streamingSequenceReport"
)
assert len(report.attempts) == 1
assert report.attempts[0].status == Status(code=0)


def test_streaming_non_retryable_error(sequence):
"""
Test a retryable stream failing with non-retryable error
"""
retry = retries.StreamingRetry(predicate=retries.if_exception_type())
content = ["hello", "world"]
error = Status(
code=_code_from_exc(core_exceptions.ServiceUnavailable),
message="expected error",
)
seq = sequence.create_streaming_sequence(
streaming_sequence={
"name": __name__,
"content": " ".join(content),
"responses": [{"status": error, "response_index": 0}],
}
)
with pytest.raises(core_exceptions.ServiceUnavailable):
it = sequence.attempt_streaming_sequence(name=seq.name, retry=retry)
next(it)
# verify streaming report
report = sequence.get_streaming_sequence_report(
name=f"{seq.name}/streamingSequenceReport"
)
assert len(report.attempts) == 1
assert report.attempts[0].status == error


def test_streaming_transient_retryable(sequence):
"""
Server returns a retryable error a number of times before success.
Retryable errors should not be presented to the end user.
"""
retry = retries.StreamingRetry(
predicate=retries.if_exception_type(core_exceptions.ServiceUnavailable),
initial=0,
maximum=0,
timeout=1,
)
content = ["hello", "world"]
error = Status(
code=_code_from_exc(core_exceptions.ServiceUnavailable),
message="transient error",
)
responses = [{"status": error, "response_index": 0} for _ in range(3)] + [
{"status": Status(code=0), "response_index": len(content)}
]
seq = sequence.create_streaming_sequence(
streaming_sequence={
"name": __name__,
"content": " ".join(content),
"responses": responses,
}
)
it = sequence.attempt_streaming_sequence(name=seq.name, retry=retry)
results = [pb.content for pb in it]
assert results == content
# verify streaming report
report = sequence.get_streaming_sequence_report(
name=f"{seq.name}/streamingSequenceReport"
)
assert len(report.attempts) == 4
assert report.attempts[0].status == error
assert report.attempts[1].status == error
assert report.attempts[2].status == error
assert report.attempts[3].status == Status(code=0)


def test_streaming_transient_retryable_partial_data(sequence):
"""
Server stream yields some data before failing with a retryable error a number of times before success.
Wrapped stream should contain data from all attempts

TODO:
Test is currently skipped for rest clients due to known issue:
https://github.com/googleapis/gapic-generator-python/issues/2375
"""
from google.protobuf.duration_pb2 import Duration

if isinstance(sequence.transport, type(sequence).get_transport_class("rest")):
pytest.skip("Skipping due to known streaming issue in rest client")

retry = retries.StreamingRetry(
predicate=retries.if_exception_type(core_exceptions.ServiceUnavailable),
initial=0,
maximum=0,
)
content = ["hello", "world"]
error = Status(
code=_code_from_exc(core_exceptions.ServiceUnavailable),
message="transient error",
)
transient_error_list = [
{"status": error, "response_index": 1, "delay": Duration(seconds=30)}
] * 3

responses = transient_error_list + [
{"status": Status(code=0), "response_index": len(content)}
]
seq = sequence.create_streaming_sequence(
streaming_sequence={
"name": __name__,
"content": " ".join(content),
"responses": responses,
}
)
it = sequence.attempt_streaming_sequence(name=seq.name, retry=retry)
results = [pb.content for pb in it]
assert results == ["hello", "hello", "hello", "hello", "world"]
# verify streaming report
report = sequence.get_streaming_sequence_report(
name=f"{seq.name}/streamingSequenceReport"
)
assert len(report.attempts) == 4
assert report.attempts[0].status == error
assert report.attempts[1].status == error
assert report.attempts[2].status == error
assert report.attempts[3].status == Status(code=0)


def test_streaming_retryable_eventual_timeout(sequence):
"""
Server returns a retryable error a number of times before reaching timeout.
Should raise a retry error.
"""
retry = retries.StreamingRetry(
predicate=retries.if_exception_type(core_exceptions.ServiceUnavailable),
initial=0,
maximum=0,
timeout=0.35,
)
content = ["hello", "world"]
error = Status(
code=_code_from_exc(core_exceptions.ServiceUnavailable),
message="transient error",
)
transient_error_list = [
{"status": error, "response_index": 1, "delay": timedelta(seconds=0.15)}
] * 10
responses = transient_error_list + [
{"status": Status(code=0), "response_index": len(content)}
]
seq = sequence.create_streaming_sequence(
streaming_sequence={
"name": __name__,
"content": " ".join(content),
"responses": responses,
}
)
with pytest.raises(core_exceptions.RetryError) as exc_info:
it = sequence.attempt_streaming_sequence(name=seq.name, retry=retry)
[pb.content for pb in it]
cause = exc_info.value.__cause__
assert isinstance(cause, core_exceptions.ServiceUnavailable)
# verify streaming report
report = sequence.get_streaming_sequence_report(
name=f"{seq.name}/streamingSequenceReport"
)
assert len(report.attempts) == 3
assert report.attempts[0].status == error
assert report.attempts[1].status == error
assert report.attempts[2].status == error


def test_streaming_retry_on_error(sequence):
"""
on_error should be called for all retryable errors as they are encountered
"""
encountered_excs = []

def on_error(exc):
encountered_excs.append(exc)

retry = retries.StreamingRetry(
predicate=retries.if_exception_type(
core_exceptions.ServiceUnavailable, core_exceptions.GatewayTimeout
),
initial=0,
maximum=0,
on_error=on_error,
)
content = ["hello", "world"]
errors = [
core_exceptions.ServiceUnavailable,
core_exceptions.DeadlineExceeded,
core_exceptions.NotFound,
]
responses = [{"status": Status(code=_code_from_exc(exc))} for exc in errors]
seq = sequence.create_streaming_sequence(
streaming_sequence={
"name": __name__,
"content": " ".join(content),
"responses": responses,
}
)
with pytest.raises(core_exceptions.NotFound):
it = sequence.attempt_streaming_sequence(name=seq.name, retry=retry)
[pb.content for pb in it]
# on_error should have been called on the first two errors, but not the terminal one
assert len(encountered_excs) == 2
assert isinstance(encountered_excs[0], core_exceptions.ServiceUnavailable)
# rest raises superclass GatewayTimeout in place of DeadlineExceeded
assert isinstance(
encountered_excs[1],
(core_exceptions.DeadlineExceeded, core_exceptions.GatewayTimeout),
)


@pytest.mark.parametrize(
"initial,multiplier,maximum,expected",
[
(0.1, 1.0, 0.5, [0.1, 0.1, 0.1]),
(0, 2.0, 0.5, [0, 0]),
(0.1, 2.0, 0.5, [0.1, 0.2, 0.4, 0.5, 0.5]),
(1, 1.5, 5, [1, 1.5, 2.25, 3.375, 5, 5]),
],
)
def test_streaming_retry_sleep_generator(
sequence, initial, multiplier, maximum, expected
):
"""
should be able to pass in sleep generator to control backoff
"""
retry = retries.StreamingRetry(
predicate=retries.if_exception_type(core_exceptions.ServiceUnavailable),
initial=initial,
maximum=maximum,
multiplier=multiplier,
)
content = ["hello", "world"]
error = Status(
code=_code_from_exc(core_exceptions.ServiceUnavailable),
message="transient error",
)
transient_error_list = [{"status": error}] * len(expected)
responses = transient_error_list + [
{"status": Status(code=0), "response_index": len(content)}
]
seq = sequence.create_streaming_sequence(
streaming_sequence={
"name": __name__,
"content": " ".join(content),
"responses": responses,
}
)
with mock.patch("random.uniform") as mock_uniform:
# make sleep generator deterministic
mock_uniform.side_effect = lambda a, b: b
with mock.patch("time.sleep") as mock_sleep:
it = sequence.attempt_streaming_sequence(name=seq.name, retry=retry)
[pb.content for pb in it]
assert mock_sleep.call_count == len(expected)
# ensure that sleep times match expected
assert mock_sleep.call_args_list == [
mock.call(sleep_time) for sleep_time in expected
]
Loading
Loading