Skip to content

Commit 7dbec66

Browse files
norlovetelpirion
andauthored
feat(BigQuery): Programmatic retries for a continuous query (#13201)
* Create README.md * Create programmatic-retries.py * Delete bigquery/continuous-queries/retries directory * Update programmatic-retries.py * Create requirements.txt * Update programmatic-retries.py * Create requirements-test.txt * Update requirements.txt * Update programmatic-retries.py * Update programmatic-retries.py * Update requirements-test.txt * Rename programmatic-retries.py to programmatic_retries.py * Create programmatic_retries_test.py * Update requirements-test.txt * Update requirements.txt * Update programmatic_retries_test.py * Update programmatic_retries_test.py * Update programmatic_retries.py * Update programmatic_retries_test.py * Update programmatic_retries.py * Update programmatic_retries_test.py * Update programmatic_retries_test.py --------- Co-authored-by: Eric Schmidt <erschmid@google.com>
1 parent 4491fd5 commit 7dbec66

File tree

4 files changed

+237
-0
lines changed

4 files changed

+237
-0
lines changed
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
# This code sample demonstrates one possible approach to automating query retry.
16+
# Important things to consider when you retry a failed continuous query include the following:
17+
# - Whether reprocessing some amount of data processed by the previous query before it failed is tolerable.
18+
# - How to handle limiting retries or using exponential backoff.
19+
20+
# Make sure you provide your SERVICE_ACCOUNT and CUSTOM_JOB_ID_PREFIX.
21+
22+
# [START functions_bigquery_continuous_queries_programmatic_retry]
23+
import base64
24+
import json
25+
import logging
26+
import re
27+
import uuid
28+
29+
import google.auth
30+
import google.auth.transport.requests
31+
import requests
32+
33+
34+
def retry_continuous_query(event, context):
35+
logging.info("Cloud Function started.")
36+
37+
if "data" not in event:
38+
logging.info("No data in Pub/Sub message.")
39+
return
40+
41+
try:
42+
# [START functions_bigquery_retry_decode]
43+
# Decode and parse the Pub/Sub message data
44+
log_entry = json.loads(base64.b64decode(event["data"]).decode("utf-8"))
45+
# [END functions_bigquery_retry_decode]
46+
47+
# [START functions_bigquery_retry_extract_query]
48+
# Extract the SQL query and other necessary data
49+
proto_payload = log_entry.get("protoPayload", {})
50+
metadata = proto_payload.get("metadata", {})
51+
job_change = metadata.get("jobChange", {})
52+
job = job_change.get("job", {})
53+
job_config = job.get("jobConfig", {})
54+
query_config = job_config.get("queryConfig", {})
55+
sql_query = query_config.get("query")
56+
job_stats = job.get("jobStats", {})
57+
end_timestamp = job_stats.get("endTime")
58+
failed_job_id = job.get("jobName")
59+
# [END functions_bigquery_retry_extract_query]
60+
61+
# Check if required fields are missing
62+
if not all([sql_query, failed_job_id, end_timestamp]):
63+
logging.error("Required fields missing from log entry.")
64+
return
65+
66+
logging.info(f"Retrying failed job: {failed_job_id}")
67+
68+
# [START functions_bigquery_retry_adjust_timestamp]
69+
# Adjust the timestamp in the SQL query
70+
timestamp_match = re.search(
71+
r"\s*TIMESTAMP\(('.*?')\)(\s*\+ INTERVAL 1 MICROSECOND)?", sql_query
72+
)
73+
74+
if timestamp_match:
75+
original_timestamp = timestamp_match.group(1)
76+
new_timestamp = f"'{end_timestamp}'"
77+
sql_query = sql_query.replace(original_timestamp, new_timestamp)
78+
elif "CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE" in sql_query:
79+
new_timestamp = f"TIMESTAMP('{end_timestamp}') + INTERVAL 1 MICROSECOND"
80+
sql_query = sql_query.replace(
81+
"CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE", new_timestamp
82+
)
83+
# [END functions_bigquery_retry_adjust_timestamp]
84+
85+
# [START functions_bigquery_retry_api_call]
86+
# Get access token
87+
credentials, project = google.auth.default(
88+
scopes=["https://www.googleapis.com/auth/cloud-platform"]
89+
)
90+
request = google.auth.transport.requests.Request()
91+
credentials.refresh(request)
92+
access_token = credentials.token
93+
94+
# API endpoint
95+
url = f"https://bigquery.googleapis.com/bigquery/v2/projects/{project}/jobs"
96+
97+
# Request headers
98+
headers = {
99+
"Authorization": f"Bearer {access_token}",
100+
"Content-Type": "application/json",
101+
}
102+
103+
# Generate a random UUID
104+
random_suffix = str(uuid.uuid4())[:8] # Take the first 8 characters of the UUID
105+
106+
# Combine the prefix and random suffix
107+
job_id = f"CUSTOM_JOB_ID_PREFIX{random_suffix}"
108+
109+
# Request payload
110+
data = {
111+
"configuration": {
112+
"query": {
113+
"query": sql_query,
114+
"useLegacySql": False,
115+
"continuous": True,
116+
"connectionProperties": [
117+
{"key": "service_account", "value": "SERVICE_ACCOUNT"}
118+
],
119+
# ... other query parameters ...
120+
},
121+
"labels": {"bqux_job_id_prefix": "CUSTOM_JOB_ID_PREFIX"},
122+
},
123+
"jobReference": {
124+
"projectId": project,
125+
"jobId": job_id, # Use the generated job ID here
126+
},
127+
}
128+
129+
# Make the API request
130+
response = requests.post(url, headers=headers, json=data)
131+
# [END functions_bigquery_retry_api_call]
132+
133+
# [START functions_bigquery_retry_handle_response]
134+
# Handle the response
135+
if response.status_code == 200:
136+
logging.info("Query job successfully created.")
137+
else:
138+
logging.error(f"Error creating query job: {response.text}")
139+
# [END functions_bigquery_retry_handle_response]
140+
141+
except Exception as e:
142+
logging.error(
143+
f"Error processing log entry or retrying query: {e}", exc_info=True
144+
)
145+
146+
logging.info("Cloud Function finished.")
147+
148+
149+
# [END functions_bigquery_continuous_queries_programmatic_retry]
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import base64
16+
import json
17+
from unittest.mock import Mock, patch
18+
import uuid
19+
20+
# Assuming your code is in a file named 'programmatic_retries.py'
21+
import programmatic_retries
22+
23+
24+
@patch("programmatic_retries.requests.post")
25+
@patch("programmatic_retries.google.auth.default")
26+
@patch("uuid.uuid4")
27+
def test_retry_success(mock_uuid, mock_auth_default, mock_requests_post):
28+
# Mocking UUID to have a predictable result
29+
mock_uuid.return_value = uuid.UUID("12345678-1234-5678-1234-567812345678")
30+
31+
# Mocking Google Auth
32+
mock_credentials = Mock()
33+
mock_credentials.token = "test_token"
34+
mock_auth_default.return_value = (mock_credentials, "test_project")
35+
36+
# Mocking the BigQuery API response
37+
mock_response = Mock()
38+
mock_response.status_code = 200
39+
mock_requests_post.return_value = mock_response
40+
41+
# Sample Pub/Sub message data (mimicking a failed continuous query)
42+
end_time = "2025-03-06T10:00:00Z"
43+
sql_query = "SELECT * FROM APPENDS(TABLE `test.table`, CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) WHERE TRUE"
44+
45+
failed_job_id = "projects/test_project/jobs/failed_job_123"
46+
47+
log_entry = {
48+
"protoPayload": {
49+
"metadata": {
50+
"jobChange": {
51+
"job": {
52+
"jobConfig": {"queryConfig": {"query": sql_query}},
53+
"jobStats": {"endTime": end_time},
54+
"jobName": failed_job_id,
55+
}
56+
}
57+
}
58+
}
59+
}
60+
61+
# Encode the log entry as a Pub/Sub message
62+
event = {
63+
"data": base64.b64encode(json.dumps(log_entry).encode("utf-8")).decode("utf-8")
64+
}
65+
66+
# Call the Cloud Function
67+
programmatic_retries.retry_continuous_query(event, None)
68+
69+
# Print the new SQL query
70+
new_query = mock_requests_post.call_args[1]["json"]["configuration"]["query"][
71+
"query"
72+
]
73+
print(f"\nNew SQL Query:\n{new_query}\n")
74+
75+
# Assertions
76+
mock_requests_post.assert_called_once()
77+
assert end_time in new_query
78+
assert (
79+
"CUSTOM_JOB_ID_PREFIX12345678"
80+
in mock_requests_post.call_args[1]["json"]["jobReference"]["jobId"]
81+
)
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
pytest==8.3.5
2+
google-auth==2.38.0
3+
requests==2.32.3
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
functions-framework==3.8.2
2+
google-cloud-bigquery==3.30.0
3+
google-auth==2.38.0
4+
requests==2.32.3

0 commit comments

Comments
 (0)