Skip to content

Commit 25db277

Browse files
authored
feat: Add ability to run tasks dataproc. (#948)
* Support gcs dirs in rsync * ws * Add create dataproc cluster task * add dataproc * ruff * requirements * still struggling * Gencode refactor to remove gcs * bump reqs * Run dataproc job * lib * running * merge requirements * Flip'em * Better exception handling * Cleaner approach if less generalizable * write a test * Fix tests * lint * Add test for success * refactor to use a base class... better for adding support for multiple jobs * cleanup * ruff * Fix missing mock * Fix flapping test * pr comments
1 parent ddf867a commit 25db277

8 files changed

+392
-21
lines changed

requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
# This file is autogenerated by pip-compile with Python 3.10
33
# by the following command:
44
#
5-
# pip-compile --resolver=backtracking requirements.in
5+
# pip-compile requirements.in
66
#
77
aiodns==2.0.0
88
# via hail
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
import time
2+
3+
import google.api_core.exceptions
4+
import luigi
5+
from google.cloud import dataproc_v1 as dataproc
6+
7+
from v03_pipeline.lib.logger import get_logger
8+
from v03_pipeline.lib.model import Env
9+
from v03_pipeline.lib.tasks.base.base_loading_pipeline_params import (
10+
BaseLoadingPipelineParams,
11+
)
12+
from v03_pipeline.lib.tasks.dataproc.create_dataproc_cluster import (
13+
CreateDataprocClusterTask,
14+
)
15+
from v03_pipeline.lib.tasks.dataproc.misc import get_cluster_name, to_kebab_str_args
16+
17+
DONE_STATE = 'DONE'
18+
ERROR_STATE = 'ERROR'
19+
SEQR_PIPELINE_RUNNER_BUILD = f'gs://seqr-pipeline-runner-builds/{Env.DEPLOYMENT_TYPE}/{Env.PIPELINE_RUNNER_APP_VERSION}'
20+
TIMEOUT_S = 172800 # 2 days
21+
22+
logger = get_logger(__name__)
23+
24+
25+
@luigi.util.inherits(BaseLoadingPipelineParams)
26+
class BaseRunJobOnDataprocTask(luigi.Task):
27+
def __init__(self, *args, **kwargs):
28+
super().__init__(*args, **kwargs)
29+
self.client = dataproc.JobControllerClient(
30+
client_options={
31+
'api_endpoint': f'{Env.GCLOUD_REGION}-dataproc.googleapis.com:443',
32+
},
33+
)
34+
35+
@property
36+
def task_name(self):
37+
return self.get_task_family().split('.')[-1]
38+
39+
@property
40+
def job_id(self):
41+
return f'{self.task_name}-{self.run_id}'
42+
43+
def requires(self) -> [luigi.Task]:
44+
return [self.clone(CreateDataprocClusterTask)]
45+
46+
def complete(self) -> bool:
47+
if not self.dataset_type.requires_dataproc:
48+
msg = f'{self.dataset_type} should not require a dataproc job'
49+
raise RuntimeError(msg)
50+
try:
51+
job = self.client.get_job(
52+
request={
53+
'project_id': Env.GCLOUD_PROJECT,
54+
'region': Env.GCLOUD_REGION,
55+
'job_id': self.job_id,
56+
},
57+
)
58+
except google.api_core.exceptions.NotFound:
59+
return False
60+
if job.status.state == ERROR_STATE:
61+
msg = f'Job {self.task_name}-{self.run_id} entered ERROR state'
62+
logger.error(msg)
63+
logger.error(job.status.details)
64+
return job.status.state == DONE_STATE
65+
66+
def run(self):
67+
operation = self.client.submit_job_as_operation(
68+
request={
69+
'project_id': Env.GCLOUD_PROJECT,
70+
'region': Env.GCLOUD_REGION,
71+
'job': {
72+
'reference': {
73+
'job_id': self.job_id,
74+
},
75+
'placement': {
76+
'cluster_name': get_cluster_name(
77+
self.reference_genome,
78+
self.run_id,
79+
),
80+
},
81+
'pyspark_job': {
82+
'main_python_file_uri': f'{SEQR_PIPELINE_RUNNER_BUILD}/bin/run_task.py',
83+
'args': [
84+
self.task_name,
85+
'--local-scheduler',
86+
*to_kebab_str_args(self),
87+
],
88+
'python_file_uris': [
89+
f'{SEQR_PIPELINE_RUNNER_BUILD}/pyscripts.zip',
90+
],
91+
},
92+
},
93+
},
94+
)
95+
wait_s = 0
96+
while wait_s < TIMEOUT_S:
97+
if operation.done():
98+
operation.result() # Will throw on failure!
99+
msg = f'Finished {self.job_id}'
100+
logger.info(msg)
101+
break
102+
logger.info(
103+
f'Waiting for job completion {self.job_id}',
104+
)
105+
time.sleep(3)
106+
wait_s += 3

v03_pipeline/lib/tasks/dataproc/create_dataproc_cluster.py

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import time
22

3+
import google.api_core.exceptions
34
import hail as hl
45
import luigi
56
from google.cloud import dataproc_v1 as dataproc
@@ -11,13 +12,15 @@
1112
from v03_pipeline.lib.tasks.base.base_loading_pipeline_params import (
1213
BaseLoadingPipelineParams,
1314
)
15+
from v03_pipeline.lib.tasks.dataproc.misc import get_cluster_name
1416

15-
CLUSTER_NAME_PREFIX = 'pipeline-runner'
1617
DEBIAN_IMAGE = '2.1.33-debian11'
18+
ERROR_STATE = 'ERROR'
1719
HAIL_VERSION = hl.version().split('-')[0]
1820
INSTANCE_TYPE = 'n1-highmem-8'
1921
PKGS = '|'.join(pip_freeze.freeze())
20-
SUCCESS_STATE = 'RUNNING'
22+
RUNNING_STATE = 'RUNNING'
23+
TIMEOUT_S = 900
2124

2225
logger = get_logger(__name__)
2326

@@ -26,7 +29,7 @@ def get_cluster_config(reference_genome: ReferenceGenome, run_id: str):
2629
service_account_credentials = get_service_account_credentials()
2730
return {
2831
'project_id': Env.GCLOUD_PROJECT,
29-
'cluster_name': f'{CLUSTER_NAME_PREFIX}-{reference_genome.value.lower()}-{run_id}',
32+
'cluster_name': get_cluster_name(reference_genome, run_id),
3033
# Schema found at https://cloud.google.com/dataproc/docs/reference/rest/v1/ClusterConfig
3134
'config': {
3235
'gce_cluster_config': {
@@ -136,27 +139,32 @@ def __init__(self, *args, **kwargs):
136139
# https://cloud.google.com/dataproc/docs/tutorials/python-library-example
137140
self.client = dataproc.ClusterControllerClient(
138141
client_options={
139-
'api_endpoint': f'{Env.GCLOUD_REGION}-dataproc.googleapis.com:443'.format(
140-
Env.GCLOUD_REGION,
141-
),
142+
'api_endpoint': f'{Env.GCLOUD_REGION}-dataproc.googleapis.com:443',
142143
},
143144
)
144145

145146
def complete(self) -> bool:
146147
if not self.dataset_type.requires_dataproc:
147-
return True
148+
msg = f'{self.dataset_type} should not require a dataproc cluster'
149+
raise RuntimeError(msg)
148150
try:
149-
client = self.client.get_cluster(
151+
cluster = self.client.get_cluster(
150152
request={
151153
'project_id': Env.GCLOUD_PROJECT,
152154
'region': Env.GCLOUD_REGION,
153-
'cluster_name': f'{CLUSTER_NAME_PREFIX}-{self.reference_genome.value.lower()}',
155+
'cluster_name': get_cluster_name(
156+
self.reference_genome,
157+
self.run_id,
158+
),
154159
},
155160
)
156-
except Exception: # noqa: BLE001
161+
except google.api_core.exceptions.NotFound:
157162
return False
158-
else:
159-
return client.status.state == SUCCESS_STATE
163+
if cluster.status.state == ERROR_STATE:
164+
msg = f'Cluster {cluster.cluster_name} entered ERROR state'
165+
logger.error(msg)
166+
# This will return False when the cluster is "CREATING"
167+
return cluster.status.state == RUNNING_STATE
160168

161169
def run(self):
162170
operation = self.client.create_cluster(
@@ -166,11 +174,13 @@ def run(self):
166174
'cluster': get_cluster_config(self.reference_genome, self.run_id),
167175
},
168176
)
169-
while True:
177+
wait_s = 0
178+
while wait_s < TIMEOUT_S:
170179
if operation.done():
171180
result = operation.result() # Will throw on failure!
172181
msg = f'Created cluster {result.cluster_name} with cluster uuid: {result.cluster_uuid}'
173182
logger.info(msg)
174183
break
175184
logger.info('Waiting for cluster spinup')
176185
time.sleep(3)
186+
wait_s += 3

v03_pipeline/lib/tasks/dataproc/create_dataproc_cluster_test.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,12 @@ def test_dataset_type_unsupported(
2828
mock_cluster_controller: Mock,
2929
_: Mock,
3030
) -> None:
31-
worker = luigi.worker.Worker()
3231
task = CreateDataprocClusterTask(
3332
reference_genome=ReferenceGenome.GRCh38,
3433
dataset_type=DatasetType.MITO,
3534
run_id='1',
3635
)
37-
worker.add(task)
38-
worker.run()
39-
self.assertTrue(task.complete())
36+
self.assertRaises(RuntimeError, task.complete)
4037

4138
def test_spinup_cluster_already_exists_failed(
4239
self,
@@ -45,7 +42,8 @@ def test_spinup_cluster_already_exists_failed(
4542
) -> None:
4643
mock_client = mock_cluster_controller.return_value
4744
mock_client.get_cluster.return_value = SimpleNamespace(
48-
status=SimpleNamespace(state='FAILED'),
45+
status=SimpleNamespace(state='ERROR'),
46+
cluster_name='abc',
4947
)
5048
mock_client.create_cluster.side_effect = (
5149
google.api_core.exceptions.AlreadyExists('cluster exists')
@@ -122,7 +120,7 @@ def test_spinup_cluster_doesnt_exist_success(
122120
operation = mock_client.create_cluster.return_value
123121
operation.done.side_effect = [False, True]
124122
operation.result.return_value = SimpleNamespace(
125-
cluster_name='dataproc-cluster-1',
123+
cluster_name='dataproc-cluster-5',
126124
cluster_uuid='12345',
127125
)
128126
worker = luigi.worker.Worker()
@@ -136,6 +134,6 @@ def test_spinup_cluster_doesnt_exist_success(
136134
mock_logger.info.assert_has_calls(
137135
[
138136
call('Waiting for cluster spinup'),
139-
call('Created cluster dataproc-cluster-1 with cluster uuid: 12345'),
137+
call('Created cluster dataproc-cluster-5 with cluster uuid: 12345'),
140138
],
141139
)
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
import re
2+
3+
import luigi
4+
5+
from v03_pipeline.lib.model import ReferenceGenome
6+
7+
CLUSTER_NAME_PREFIX = 'pipeline-runner'
8+
9+
10+
def get_cluster_name(reference_genome: ReferenceGenome, run_id: str):
11+
return f'{CLUSTER_NAME_PREFIX}-{reference_genome.value.lower()}-{run_id}'
12+
13+
14+
def snake_to_kebab_arg(snake_string: str) -> str:
15+
return '--' + re.sub(r'\_', '-', snake_string).lower()
16+
17+
18+
def to_kebab_str_args(task: luigi.Task):
19+
return [
20+
e for k, v in task.to_str_params().items() for e in (snake_to_kebab_arg(k), v)
21+
]
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
import unittest
2+
from unittest.mock import Mock, patch
3+
4+
from v03_pipeline.lib.model import DatasetType, ReferenceGenome, SampleType
5+
from v03_pipeline.lib.tasks.dataproc.misc import to_kebab_str_args
6+
from v03_pipeline.lib.tasks.dataproc.write_success_file_on_dataproc import (
7+
WriteSuccessFileOnDataprocTask,
8+
)
9+
10+
11+
@patch(
12+
'v03_pipeline.lib.tasks.dataproc.base_run_job_on_dataproc.dataproc.JobControllerClient',
13+
)
14+
class MiscTest(unittest.TestCase):
15+
def test_to_kebab_str_args(self, _: Mock):
16+
t = WriteSuccessFileOnDataprocTask(
17+
reference_genome=ReferenceGenome.GRCh38,
18+
dataset_type=DatasetType.SNV_INDEL,
19+
sample_type=SampleType.WGS,
20+
callset_path='test_callset',
21+
project_guids=['R0113_test_project'],
22+
project_remap_paths=['test_remap'],
23+
project_pedigree_paths=['test_pedigree'],
24+
run_id='a_misc_run',
25+
)
26+
self.assertListEqual(
27+
to_kebab_str_args(t),
28+
[
29+
'--reference-genome',
30+
'GRCh38',
31+
'--dataset-type',
32+
'SNV_INDEL',
33+
'--run-id',
34+
'a_misc_run',
35+
'--sample-type',
36+
'WGS',
37+
'--callset-path',
38+
'test_callset',
39+
'--project-guids',
40+
'["R0113_test_project"]',
41+
'--project-remap-paths',
42+
'["test_remap"]',
43+
'--project-pedigree-paths',
44+
'["test_pedigree"]',
45+
'--ignore-missing-samples-when-remapping',
46+
'False',
47+
'--skip-check-sex-and-relatedness',
48+
'False',
49+
'--skip-expect-filters',
50+
'False',
51+
'--skip-expect-tdr-metrics',
52+
'False',
53+
'--skip-validation',
54+
'False',
55+
'--is-new-gcnv-joint-call',
56+
'False',
57+
],
58+
)
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import luigi
2+
3+
from v03_pipeline.lib.paths import pipeline_run_success_file_path
4+
from v03_pipeline.lib.tasks.base.base_loading_run_params import (
5+
BaseLoadingRunParams,
6+
)
7+
from v03_pipeline.lib.tasks.dataproc.base_run_job_on_dataproc import (
8+
BaseRunJobOnDataprocTask,
9+
)
10+
from v03_pipeline.lib.tasks.files import GCSorLocalTarget
11+
12+
13+
@luigi.util.inherits(BaseLoadingRunParams)
14+
class WriteSuccessFileOnDataprocTask(BaseRunJobOnDataprocTask):
15+
def output(self) -> luigi.Target:
16+
return GCSorLocalTarget(
17+
pipeline_run_success_file_path(
18+
self.reference_genome,
19+
self.dataset_type,
20+
self.run_id,
21+
),
22+
)

0 commit comments

Comments
 (0)