Skip to content

Commit 52d5372

Browse files
committed
bugfix cluster and job state
1 parent e146cf7 commit 52d5372

File tree

4 files changed

+35
-16
lines changed

4 files changed

+35
-16
lines changed

v03_pipeline/lib/tasks/dataproc/base_run_job_on_dataproc.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import time
22

33
import google.api_core.exceptions
4+
import google.cloud.dataproc_v1.types.jobs
45
import luigi
56
from google.cloud import dataproc_v1 as dataproc
67

@@ -14,8 +15,6 @@
1415
)
1516
from v03_pipeline.lib.tasks.dataproc.misc import get_cluster_name, to_kebab_str_args
1617

17-
DONE_STATE = 'DONE'
18-
ERROR_STATE = 'ERROR'
1918
SEQR_PIPELINE_RUNNER_BUILD = f'gs://seqr-pipeline-runner-builds/{Env.DEPLOYMENT_TYPE}/{Env.PIPELINE_RUNNER_APP_VERSION}'
2019
TIMEOUT_S = 172800 # 2 days
2120

@@ -57,11 +56,17 @@ def complete(self) -> bool:
5756
)
5857
except google.api_core.exceptions.NotFound:
5958
return False
60-
if job.status.state == ERROR_STATE:
61-
msg = f'Job {self.task.task_family}-{self.run_id} entered ERROR state'
59+
if job.status.state in {
60+
google.cloud.dataproc_v1.types.jobs.JobStatus.State.CANCELLED,
61+
google.cloud.dataproc_v1.types.jobs.JobStatus.State.ERROR,
62+
google.cloud.dataproc_v1.types.jobs.JobStatus.State.ATTEMPT_FAILURE,
63+
}:
64+
msg = f'Job {self.task.task_family}-{self.run_id} entered {job.status.state!s} state'
6265
logger.error(msg)
6366
logger.error(job.status.details)
64-
return job.status.state == DONE_STATE
67+
return (
68+
job.status.state == google.cloud.dataproc_v1.types.jobs.JobStatus.State.DONE
69+
)
6570

6671
def run(self):
6772
operation = self.client.submit_job_as_operation(

v03_pipeline/lib/tasks/dataproc/create_dataproc_cluster.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import time
22

33
import google.api_core.exceptions
4+
import google.cloud.dataproc_v1.types.clusters
45
import hail as hl
56
import luigi
67
from google.cloud import dataproc_v1 as dataproc
@@ -15,11 +16,9 @@
1516
from v03_pipeline.lib.tasks.dataproc.misc import get_cluster_name
1617

1718
DEBIAN_IMAGE = '2.1.33-debian11'
18-
ERROR_STATE = 'ERROR'
1919
HAIL_VERSION = hl.version().split('-')[0]
2020
INSTANCE_TYPE = 'n1-highmem-8'
2121
PKGS = '|'.join(pip_freeze.freeze())
22-
RUNNING_STATE = 'RUNNING'
2322
TIMEOUT_S = 900
2423

2524
logger = get_logger(__name__)
@@ -160,11 +159,20 @@ def complete(self) -> bool:
160159
)
161160
except google.api_core.exceptions.NotFound:
162161
return False
163-
if cluster.status.state == ERROR_STATE:
164-
msg = f'Cluster {cluster.cluster_name} entered ERROR state'
162+
if cluster.status.state in {
163+
google.cloud.dataproc_v1.types.clusters.ClusterStatus.State.UNKNOWN,
164+
google.cloud.dataproc_v1.types.clusters.ClusterStatus.State.ERROR,
165+
google.cloud.dataproc_v1.types.clusters.ClusterStatus.State.ERROR_DUE_TO_UPDATE,
166+
}:
167+
msg = (
168+
f'Cluster {cluster.cluster_name} entered {cluster.status.state!s} state'
169+
)
165170
logger.error(msg)
166171
# This will return False when the cluster is "CREATING"
167-
return cluster.status.state == RUNNING_STATE
172+
return (
173+
cluster.status.state
174+
== google.cloud.dataproc_v1.types.clusters.ClusterStatus.State.RUNNING
175+
)
168176

169177
def run(self):
170178
if not Env.GCLOUD_PROJECT or not Env.GCLOUD_REGION or not Env.GCLOUD_ZONE:

v03_pipeline/lib/tasks/dataproc/create_dataproc_cluster_test.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from unittest.mock import Mock, call, patch
44

55
import google.api_core.exceptions
6+
import google.cloud.dataproc_v1.types.clusters
67
import luigi
78

89
from v03_pipeline.lib.misc.gcp import SOCIAL_AUTH_GOOGLE_OAUTH2_SCOPE
@@ -48,7 +49,9 @@ def test_spinup_cluster_already_exists_failed(
4849
) -> None:
4950
mock_client = mock_cluster_controller.return_value
5051
mock_client.get_cluster.return_value = SimpleNamespace(
51-
status=SimpleNamespace(state='ERROR'),
52+
status=SimpleNamespace(
53+
state=google.cloud.dataproc_v1.types.clusters.ClusterStatus.State.ERROR
54+
),
5255
cluster_name='abc',
5356
)
5457
mock_client.create_cluster.side_effect = (
@@ -71,7 +74,9 @@ def test_spinup_cluster_already_exists_success(
7174
) -> None:
7275
mock_client = mock_cluster_controller.return_value
7376
mock_client.get_cluster.return_value = SimpleNamespace(
74-
status=SimpleNamespace(state='RUNNING'),
77+
status=SimpleNamespace(
78+
state=google.cloud.dataproc_v1.types.clusters.ClusterStatus.State.RUNNING
79+
),
7580
)
7681
mock_client.create_cluster.side_effect = (
7782
google.api_core.exceptions.AlreadyExists('cluster exists')

v03_pipeline/lib/tasks/dataproc/run_pipeline_on_dataproc_test.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from unittest.mock import Mock, call, patch
44

55
import google.api_core.exceptions
6+
import google.cloud.dataproc_v1.types.jobs
67
import luigi
78

89
from v03_pipeline.lib.model import DatasetType, ReferenceGenome, SampleType
@@ -30,7 +31,7 @@ def test_job_already_exists_failed(
3031
mock_client = mock_job_controller_client.return_value
3132
mock_client.get_job.return_value = SimpleNamespace(
3233
status=SimpleNamespace(
33-
state='ERROR',
34+
state=google.cloud.dataproc_v1.types.jobs.JobStatus.State.ERROR,
3435
details='Google Cloud Dataproc Agent reports job failure. If logs are available, they can be found at...',
3536
),
3637
)
@@ -54,7 +55,7 @@ def test_job_already_exists_failed(
5455
mock_logger.error.assert_has_calls(
5556
[
5657
call(
57-
'Job RunPipelineTask-manual__2024-04-03 entered ERROR state',
58+
'Job RunPipelineTask-manual__2024-04-03 entered State.ERROR state',
5859
),
5960
],
6061
)
@@ -67,7 +68,7 @@ def test_job_already_exists_success(
6768
mock_create_dataproc_cluster.return_value = MockCompleteTask()
6869
mock_client = mock_job_controller_client.return_value
6970
mock_client.get_job.return_value = SimpleNamespace(
70-
status=SimpleNamespace(state='DONE'),
71+
status=SimpleNamespace(state=google.cloud.dataproc_v1.types.jobs.JobStatus.State.DONE),
7172
)
7273
worker = luigi.worker.Worker()
7374
task = RunPipelineOnDataprocTask(
@@ -135,7 +136,7 @@ def test_job_success(
135136
'job not found',
136137
),
137138
SimpleNamespace(
138-
status=SimpleNamespace(state='DONE'),
139+
status=SimpleNamespace(state=google.cloud.dataproc_v1.types.jobs.JobStatus.State.DONE),
139140
),
140141
]
141142
operation = mock_client.submit_job_as_operation.return_value

0 commit comments

Comments
 (0)