Skip to content
Draft
324 changes: 180 additions & 144 deletions tests/unit/test_job_retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

from google.cloud.bigquery.client import Client
from google.cloud.bigquery import _job_helpers
from google.cloud.bigquery.retry import DEFAULT_JOB_RETRY
from google.cloud.bigquery.retry import DEFAULT_RETRY, DEFAULT_JOB_RETRY

from .helpers import make_connection

Expand All @@ -35,8 +35,7 @@
# - Pass NotFound retry to `result`.
# - Pass BadRequest retry to query, with the value passed to `result` overriding.
@pytest.mark.parametrize("job_retry_on_query", [None, "Query", "Result", "Both"])
@mock.patch("time.sleep")
def test_retry_failed_jobs(sleep, client, job_retry_on_query):
def test_retry_failed_jobs(job_retry_on_query):
"""
Test retry of job failures, as opposed to API-invocation failures.
"""
Expand All @@ -53,171 +52,208 @@ def test_retry_failed_jobs(sleep, client, job_retry_on_query):
)

if job_retry_on_query is None:
reason = "rateLimitExceeded"
errs = [{"reason": "rateLimitExceeded"}]
else:
reason = "notFound"

err = dict(reason=reason)
responses = [
dict(status=dict(state="DONE", errors=[err], errorResult=err)),
dict(status=dict(state="DONE", errors=[err], errorResult=err)),
dict(status=dict(state="DONE", errors=[err], errorResult=err)),
dict(status=dict(state="DONE")),
dict(rows=[{"f": [{"v": "1"}]}], totalRows="1"),
]

def api_request(method, path, query_params=None, data=None, **kw):
response = responses.pop(0)
if data:
response["jobReference"] = data["jobReference"]
else:
response["jobReference"] = dict(
jobId=path.split("/")[-1], projectId="PROJECT"
)
return response

conn = client._connection = make_connection()
conn.api_request.side_effect = api_request
errs = [{"reason": "notFound"}]

freezegun.freeze_time(auto_tick_seconds=1)
client = mock.create_autospec(Client)
client._call_api.__name__ = "_call_api"
client._call_api.__qualname__ = "Client._call_api"
client._call_api.__annotations__ = {}
client._call_api.__type_params__ = ()
client._call_api.side_effect = (
{
"jobReference": {
"projectId": "response-project",
"jobId": "abc",
"location": "response-location",
},
"jobComplete": False,
},
google.api_core.exceptions.InternalServerError("job_retry me", errors=errs),
{
"jobReference": {
"projectId": "response-project",
"jobId": "abc",
"location": "response-location",
},
"jobComplete": True,
"schema": {
"fields": [
{"name": "full_name", "type": "STRING", "mode": "REQUIRED"},
{"name": "age", "type": "INT64", "mode": "NULLABLE"},
],
},
"rows": [
{"f": [{"v": "Whillma Phlyntstone"}, {"v": "27"}]},
{"f": [{"v": "Bhetty Rhubble"}, {"v": "28"}]},
{"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]},
{"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]},
],
},
)

if job_retry_on_query == "Query":
job_retry = dict(job_retry=retry_notfound)
job_retry = retry_notfound
elif job_retry_on_query == "Both":
# This will be overridden in `result`
job_retry = dict(job_retry=retry_badrequest)
job_retry = retry_badrequest
else:
job_retry = {}
job = client.query("select 1", **job_retry)

orig_job_id = job.job_id
job_retry = (
dict(job_retry=retry_notfound)
if job_retry_on_query in ("Result", "Both")
else {}
)
result = job.result(**job_retry)
assert result.total_rows == 1
assert not responses # We made all the calls we expected to.

# The job adjusts it's job id based on the id of the last attempt.
assert job.job_id != orig_job_id
assert job.job_id == conn.mock_calls[3][2]["data"]["jobReference"]["jobId"]
job_retry = None

# We had to sleep three times
assert len(sleep.mock_calls) == 3

# Sleeps are random, however they're more than 0
assert min(c[1][0] for c in sleep.mock_calls) > 0

# They're at most 2 * (multiplier**(number of sleeps - 1)) * initial
# The default multiplier is 2
assert max(c[1][0] for c in sleep.mock_calls) <= 8

# We can ask for the result again:
responses = [
dict(rows=[{"f": [{"v": "1"}]}], totalRows="1"),
]
orig_job_id = job.job_id
result = job.result()
assert result.total_rows == 1
assert not responses # We made all the calls we expected to.
rows = _job_helpers.query_and_wait(
client,
query="SELECT 1",
location="request-location",
project="request-project",
job_config=None,
page_size=None,
max_results=None,
retry=DEFAULT_RETRY,
job_retry=job_retry,
)

# We wouldn't (and didn't) fail, because we're dealing with a successful job.
# So the job id hasn't changed.
assert job.job_id == orig_job_id
assert len(list(rows)) == 4


# With job_retry_on_query, we're testing 4 scenarios:
# - Pass None retry to `query`.
# - Pass None retry to `result`.
@pytest.mark.parametrize("job_retry_on_query", ["Query", "Result"])
@mock.patch("time.sleep")
def test_disable_retry_failed_jobs(sleep, client, job_retry_on_query):
def test_disable_retry_failed_jobs(job_retry_on_query):
"""
Test retry of job failures, as opposed to API-invocation failures.
"""
err = dict(reason="rateLimitExceeded")
responses = [dict(status=dict(state="DONE", errors=[err], errorResult=err))] * 3

def api_request(method, path, query_params=None, data=None, **kw):
response = responses.pop(0)
response["jobReference"] = data["jobReference"]
return response

conn = client._connection = make_connection()
conn.api_request.side_effect = api_request

if job_retry_on_query == "Query":
job_retry = dict(job_retry=None)
else:
job_retry = {}
job = client.query("select 1", **job_retry)
freezegun.freeze_time(auto_tick_seconds=1)
client = mock.create_autospec(Client)
client._call_api.__name__ = "_call_api"
client._call_api.__qualname__ = "Client._call_api"
client._call_api.__annotations__ = {}
client._call_api.__type_params__ = ()
client._call_api.side_effect = (
{
"jobReference": {
"projectId": "response-project",
"jobId": "abc",
"location": "response-location",
},
"jobComplete": False,
},
google.api_core.exceptions.InternalServerError(
"job_retry me", errors=[{"reason": "rateLimitExceeded"}]
),
)

orig_job_id = job.job_id
job_retry = dict(job_retry=None) if job_retry_on_query == "Result" else {}
with pytest.raises(google.api_core.exceptions.Forbidden):
job.result(**job_retry)
rows = _job_helpers.query_and_wait(
client,
query="SELECT 1",
location="request-location",
project="request-project",
job_config=None,
page_size=None,
max_results=None,
retry=None, # Explicitly disable retry
job_retry=None,
)

assert job.job_id == orig_job_id
assert len(sleep.mock_calls) == 0
with pytest.raises(google.api_core.exceptions.InternalServerError):
list(rows) # Raise the last error


@mock.patch("time.sleep")
def test_retry_failed_jobs_after_retry_failed(sleep, client):
def test_retry_failed_jobs_after_retry_failed(client):
"""
If at first you don't succeed, maybe you will later. :)
"""
conn = client._connection = make_connection()

with freezegun.freeze_time("2024-01-01 00:00:00") as frozen_datetime:
err = dict(reason="rateLimitExceeded")

def api_request(method, path, query_params=None, data=None, **kw):
calls = sleep.mock_calls
if calls:
frozen_datetime.tick(delta=datetime.timedelta(seconds=calls[-1][1][0]))
response = dict(status=dict(state="DONE", errors=[err], errorResult=err))
response["jobReference"] = data["jobReference"]
return response

conn.api_request.side_effect = api_request

job = client.query("select 1")
orig_job_id = job.job_id

with pytest.raises(google.api_core.exceptions.RetryError):
job.result()

# We never got a successful job, so the job id never changed:
assert job.job_id == orig_job_id

# We failed because we couldn't succeed after 120 seconds.
# But we can try again:
err2 = dict(reason="backendError") # We also retry on this
responses = [
dict(status=dict(state="DONE", errors=[err2], errorResult=err2)),
dict(status=dict(state="DONE", errors=[err], errorResult=err)),
dict(status=dict(state="DONE", errors=[err2], errorResult=err2)),
dict(status=dict(state="DONE")),
dict(rows=[{"f": [{"v": "1"}]}], totalRows="1"),
]

def api_request(method, path, query_params=None, data=None, **kw):
calls = sleep.mock_calls
frozen_datetime.tick(delta=datetime.timedelta(seconds=calls[-1][1][0]))
response = responses.pop(0)
if data:
response["jobReference"] = data["jobReference"]
else:
response["jobReference"] = dict(
jobId=path.split("/")[-1], projectId="PROJECT"
)
return response

conn.api_request.side_effect = api_request
result = job.result()
assert result.total_rows == 1
assert not responses # We made all the calls we expected to.
assert job.job_id != orig_job_id

freezegun.freeze_time(auto_tick_seconds=1)
client = mock.create_autospec(Client)
client._call_api.__name__ = "_call_api"
client._call_api.__qualname__ = "Client._call_api"
client._call_api.__annotations__ = {}
client._call_api.__type_params__ = ()
client._call_api.side_effect = (
{
"jobReference": {
"projectId": "response-project",
"jobId": "abc",
"location": "response-location",
},
"jobComplete": False,
},
google.api_core.exceptions.InternalServerError(
"job_retry me", errors=[{"reason": "rateLimitExceeded"}]
),
# Responses for subsequent success
{
"jobReference": {
"jobId": "job1",
"projectId": "project",
"location": "location",
},
"jobComplete": False,
},
google.api_core.exceptions.BadRequest(
"job_retry me", errors=[{"reason": "backendError"}]
),
google.api_core.exceptions.InternalServerError(
"job_retry me", errors=[{"reason": "rateLimitExceeded"}]
),
google.api_core.exceptions.BadRequest(
"job_retry me", errors=[{"reason": "backendError"}]
),
{
"jobReference": {
"projectId": "response-project",
"jobId": "abc",
"location": "response-location",
},
"jobComplete": True,
"schema": {
"fields": [
{"name": "full_name", "type": "STRING", "mode": "REQUIRED"},
{"name": "age", "type": "INT64", "mode": "NULLABLE"},
],
},
"rows": [
{"f": [{"v": "Whillma Phlyntstone"}, {"v": "27"}]},
{"f": [{"v": "Bhetty Rhubble"}, {"v": "28"}]},
{"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]},
{"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]},
],
},
)

rows = _job_helpers.query_and_wait(
client,
query="SELECT 1",
location="request-location",
project="request-project",
job_config=None,
page_size=None,
max_results=None,
retry=DEFAULT_RETRY,
job_retry=DEFAULT_JOB_RETRY,
)
# TODO: different test to test if it retries until it times out
with pytest.raises(google.api_core.exceptions.RetryError):
list(rows) # Trigger the initial retry failure

# Second attempt with successful retries
rows = _job_helpers.query_and_wait(
client,
query="SELECT 1",
location="request-location",
project="request-project",
job_config=None,
page_size=None,
max_results=None,
retry=DEFAULT_RETRY,
job_retry=DEFAULT_RETRY,
)

assert len(list(rows)) == 4


def test_raises_on_job_retry_on_query_with_non_retryable_jobs(client):
Expand Down Expand Up @@ -301,7 +337,7 @@ def test_query_and_wait_retries_job_for_DDL_queries():
job_config=None,
page_size=None,
max_results=None,
retry=DEFAULT_JOB_RETRY,
retry=DEFAULT_RETRY,
job_retry=DEFAULT_JOB_RETRY,
)
assert len(list(rows)) == 4
Expand Down