Skip to content

Commit 66bd8c4

Browse files
glasntaman-ebay
andauthored
feat(dataproc): Add Spark Job to Cluster sample (#13459) (#13469)
* Create submit_spark_job_to_driver_node_group_cluster.py * add a system test, copied from instantiate_line_workflow, create cluster tests * use create cluster sample to create cluster * black, isort * ensure job check is specified in output * create node cluster manually --------- Co-authored-by: aman-ebay <amancuso@google.com>
1 parent 4cc2cb6 commit 66bd8c4

File tree

2 files changed

+189
-0
lines changed

2 files changed

+189
-0
lines changed
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
#!/usr/bin/env python
2+
3+
# Copyright 2025 Google LLC
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
# This sample walks a user through submitting a Spark job to a
18+
# Dataproc driver node group cluster using the Dataproc
19+
# client library.
20+
21+
# Usage:
22+
# python submit_spark_job_to_driver_node_group_cluster.py \
23+
# --project_id <PROJECT_ID> --region <REGION> \
24+
# --cluster_name <CLUSTER_NAME>
25+
26+
# [START dataproc_submit_spark_job_to_driver_node_group_cluster]
27+
28+
import re
29+
30+
from google.cloud import dataproc_v1 as dataproc
31+
from google.cloud import storage
32+
33+
34+
def submit_job(project_id: str, region: str, cluster_name: str) -> None:
35+
"""Submits a Spark job to the specified Dataproc cluster with a driver node group and prints the output.
36+
37+
Args:
38+
project_id: The Google Cloud project ID.
39+
region: The Dataproc region where the cluster is located.
40+
cluster_name: The name of the Dataproc cluster.
41+
"""
42+
# Create the job client.
43+
with dataproc.JobControllerClient(
44+
client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
45+
) as job_client:
46+
47+
driver_scheduling_config = dataproc.DriverSchedulingConfig(
48+
memory_mb=2048, # Example memory in MB
49+
vcores=2, # Example number of vcores
50+
)
51+
52+
# Create the job config. 'main_jar_file_uri' can also be a
53+
# Google Cloud Storage URL.
54+
job = {
55+
"placement": {"cluster_name": cluster_name},
56+
"spark_job": {
57+
"main_class": "org.apache.spark.examples.SparkPi",
58+
"jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
59+
"args": ["1000"],
60+
},
61+
"driver_scheduling_config": driver_scheduling_config
62+
}
63+
64+
operation = job_client.submit_job_as_operation(
65+
request={"project_id": project_id, "region": region, "job": job}
66+
)
67+
68+
response = operation.result()
69+
70+
# Dataproc job output gets saved to the Cloud Storage bucket
71+
# allocated to the job. Use a regex to obtain the bucket and blob info.
72+
matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)
73+
if not matches:
74+
print(f"Error: Could not parse driver output URI: {response.driver_output_resource_uri}")
75+
raise ValueError
76+
77+
output = (
78+
storage.Client()
79+
.get_bucket(matches.group(1))
80+
.blob(f"{matches.group(2)}.000000000")
81+
.download_as_bytes()
82+
.decode("utf-8")
83+
)
84+
85+
print(f"Job finished successfully: {output}")
86+
87+
# [END dataproc_submit_spark_job_to_driver_node_group_cluster]
88+
89+
90+
if __name__ == "__main__":
91+
import argparse
92+
93+
parser = argparse.ArgumentParser(
94+
description="Submits a Spark job to a Dataproc driver node group cluster."
95+
)
96+
parser.add_argument("--project_id", help="The Google Cloud project ID.", required=True)
97+
parser.add_argument("--region", help="The Dataproc region where the cluster is located.", required=True)
98+
parser.add_argument("--cluster_name", help="The name of the Dataproc cluster.", required=True)
99+
100+
args = parser.parse_args()
101+
submit_job(args.project_id, args.region, args.cluster_name)
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
# Copyright 2020 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import os
16+
import subprocess
17+
import uuid
18+
19+
import backoff
20+
from google.api_core.exceptions import (
21+
Aborted,
22+
InternalServerError,
23+
NotFound,
24+
ServiceUnavailable,
25+
)
26+
from google.cloud import dataproc_v1 as dataproc
27+
28+
import submit_spark_job_to_driver_node_group_cluster
29+
30+
PROJECT_ID = os.environ["GOOGLE_CLOUD_PROJECT"]
31+
REGION = "us-central1"
32+
CLUSTER_NAME = f"py-ss-test-{str(uuid.uuid4())}"
33+
34+
cluster_client = dataproc.ClusterControllerClient(
35+
client_options={"api_endpoint": f"{REGION}-dataproc.googleapis.com:443"}
36+
)
37+
38+
39+
@backoff.on_exception(backoff.expo, (Exception), max_tries=5)
40+
def teardown():
41+
try:
42+
operation = cluster_client.delete_cluster(
43+
request={
44+
"project_id": PROJECT_ID,
45+
"region": REGION,
46+
"cluster_name": CLUSTER_NAME,
47+
}
48+
)
49+
# Wait for cluster to delete
50+
operation.result()
51+
except NotFound:
52+
print("Cluster already deleted")
53+
54+
55+
@backoff.on_exception(
56+
backoff.expo,
57+
(
58+
InternalServerError,
59+
ServiceUnavailable,
60+
Aborted,
61+
),
62+
max_tries=5,
63+
)
64+
def test_workflows(capsys):
65+
# Setup driver node group cluster. TODO: cleanup b/424371877
66+
command = f"""gcloud dataproc clusters create {CLUSTER_NAME} \
67+
--region {REGION} \
68+
--project {PROJECT_ID} \
69+
--driver-pool-size=1 \
70+
--driver-pool-id=pytest"""
71+
72+
output = subprocess.run(
73+
command,
74+
capture_output=True,
75+
shell=True,
76+
check=True,
77+
)
78+
print(output)
79+
80+
# Wrapper function for client library function
81+
submit_spark_job_to_driver_node_group_cluster.submit_job(
82+
PROJECT_ID, REGION, CLUSTER_NAME
83+
)
84+
85+
out, _ = capsys.readouterr()
86+
assert "Job finished successfully" in out
87+
88+
# cluster deleted in teardown()

0 commit comments

Comments
 (0)