diff --git a/requirements.in b/requirements.in index af19fd655..6d5abb8b9 100644 --- a/requirements.in +++ b/requirements.in @@ -1,7 +1,6 @@ -elasticsearch==7.9.1 -google-api-python-client>=1.8.0 hail==0.2.132 -luigi>=3.4.0 +luigi==3.5.2 gnomad==0.6.4 aiofiles==24.1.0 pydantic==2.8.2 +google-cloud-dataproc==5.14.0 diff --git a/requirements.txt b/requirements.txt index a565a7c41..9fad612cc 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,7 +2,7 @@ # This file is autogenerated by pip-compile with Python 3.10 # by the following command: # -# pip-compile requirements.in +# pip-compile --resolver=backtracking requirements.in # aiodns==2.0.0 # via hail @@ -28,7 +28,7 @@ avro==1.11.3 # via hail azure-common==1.1.28 # via azure-mgmt-storage -azure-core==1.31.0 +azure-core==1.32.0 # via # azure-identity # azure-mgmt-core @@ -36,7 +36,7 @@ azure-core==1.31.0 # msrest azure-identity==1.19.0 # via hail -azure-mgmt-core==1.4.0 +azure-mgmt-core==1.5.0 # via azure-mgmt-storage azure-mgmt-storage==20.1.0 # via hail @@ -44,9 +44,9 @@ azure-storage-blob==12.23.1 # via hail bokeh==3.3.4 # via hail -boto3==1.35.48 +boto3==1.35.53 # via hail -botocore==1.35.48 +botocore==1.35.53 # via # boto3 # hail @@ -55,7 +55,6 @@ cachetools==5.5.0 # via google-auth certifi==2024.8.30 # via - # elasticsearch # msrest # requests cffi==1.17.1 @@ -86,10 +85,6 @@ deprecated==1.2.14 # via hail dill==0.3.9 # via hail -docutils==0.21.2 - # via python-daemon -elasticsearch==7.9.1 - # via -r requirements.in exceptiongroup==1.2.2 # via ipython executing==2.1.0 @@ -101,38 +96,44 @@ frozenlist==1.5.0 # hail gnomad==0.6.4 # via -r requirements.in -google-api-core==2.21.0 - # via google-api-python-client -google-api-python-client==2.149.0 - # via -r requirements.in +google-api-core[grpc]==2.22.0 + # via google-cloud-dataproc google-auth==2.35.0 # via # google-api-core - # google-api-python-client - # google-auth-httplib2 # google-auth-oauthlib + # google-cloud-dataproc # hail -google-auth-httplib2==0.2.0 - # via google-api-python-client google-auth-oauthlib==0.8.0 # via hail -googleapis-common-protos==1.65.0 +google-cloud-dataproc==5.14.0 + # via -r requirements.in +googleapis-common-protos[grpc]==1.65.0 + # via + # google-api-core + # grpc-google-iam-v1 + # grpcio-status +grpc-google-iam-v1==0.13.1 + # via google-cloud-dataproc +grpcio==1.67.1 + # via + # google-api-core + # googleapis-common-protos + # grpc-google-iam-v1 + # grpcio-status +grpcio-status==1.48.2 # via google-api-core hail==0.2.132 # via -r requirements.in hdbscan==0.8.39 # via gnomad -httplib2==0.22.0 - # via - # google-api-python-client - # google-auth-httplib2 humanize==1.1.0 # via hail idna==3.10 # via # requests # yarl -ipython==8.28.0 +ipython==8.29.0 # via ipywidgets ipywidgets==8.1.5 # via gnomad @@ -218,11 +219,16 @@ prompt-toolkit==3.0.48 propcache==0.2.0 # via yarl proto-plus==1.25.0 - # via google-api-core + # via + # google-api-core + # google-cloud-dataproc protobuf==3.20.2 # via # google-api-core + # google-cloud-dataproc # googleapis-common-protos + # grpc-google-iam-v1 + # grpcio-status # hail # proto-plus ptyprocess==0.7.0 @@ -251,11 +257,9 @@ pygments==2.18.0 # rich pyjwt[crypto]==2.9.0 # via msal -pyparsing==3.2.0 - # via httplib2 pyspark==3.5.3 # via hail -python-daemon==3.0.1 +python-daemon==3.1.0 # via luigi python-dateutil==2.9.0.post0 # via @@ -348,12 +352,9 @@ typing-extensions==4.12.2 # typer tzdata==2024.2 # via pandas -uritemplate==4.1.1 - # via google-api-python-client urllib3==2.2.3 # via # botocore - # elasticsearch # requests uvloop==0.21.0 # via hail @@ -365,7 +366,7 @@ wrapt==1.16.0 # via deprecated xyzservices==2024.9.0 # via bokeh -yarl==1.16.0 +yarl==1.17.1 # via aiohttp # The following packages are considered to be unsafe in a requirements file: diff --git a/v03_pipeline/bin/dataproc_vep_init.bash b/v03_pipeline/bin/dataproc_vep_init.bash index b0332b9e9..541c2eb63 100755 --- a/v03_pipeline/bin/dataproc_vep_init.bash +++ b/v03_pipeline/bin/dataproc_vep_init.bash @@ -14,7 +14,7 @@ set -x export PROJECT="$(gcloud config get-value project)" -export ENVIRONMENT="$(/usr/share/google/get_metadata_value attributes/ENVIRONMENT)" +export DEPLOYMENT_TYPE="$(/usr/share/google/get_metadata_value attributes/DEPLOYMENT_TYPE)" export REFERENCE_GENOME="$(/usr/share/google/get_metadata_value attributes/REFERENCE_GENOME)" export PIPELINE_RUNNER_APP_VERSION="$(/usr/share/google/get_metadata_value attributes/PIPELINE_RUNNER_APP_VERSION)" @@ -53,10 +53,10 @@ EOF gcc -Wall -Werror -O2 /vep.c -o /vep chmod u+s /vep -gcloud storage cp gs://seqr-pipeline-runner-builds/$ENVIRONMENT/$PIPELINE_RUNNER_APP_VERSION/bin/download_vep_reference_data.bash /download_vep_reference_data.bash +gcloud storage cp gs://seqr-pipeline-runner-builds/$DEPLOYMENT_TYPE/$PIPELINE_RUNNER_APP_VERSION/bin/download_vep_reference_data.bash /download_vep_reference_data.bash chmod +x /download_vep_reference_data.bash ./download_vep_reference_data.bash $REFERENCE_GENOME -gcloud storage cp gs://seqr-pipeline-runner-builds/$ENVIRONMENT/$PIPELINE_RUNNER_APP_VERSION/bin/vep /vep.bash +gcloud storage cp gs://seqr-pipeline-runner-builds/$DEPLOYMENT_TYPE/$PIPELINE_RUNNER_APP_VERSION/bin/vep /vep.bash chmod +x /vep.bash diff --git a/v03_pipeline/lib/model/dataset_type.py b/v03_pipeline/lib/model/dataset_type.py index daa31ab1b..8bad768bb 100644 --- a/v03_pipeline/lib/model/dataset_type.py +++ b/v03_pipeline/lib/model/dataset_type.py @@ -361,6 +361,10 @@ def lookup_table_annotation_fns(self) -> list[Callable[..., hl.Expression]]: def should_send_to_allele_registry(self): return self == DatasetType.SNV_INDEL + @property + def requires_dataproc(self): + return self == DatasetType.SNV_INDEL + @property def should_export_to_vcf(self): return self == DatasetType.SV diff --git a/v03_pipeline/lib/model/environment.py b/v03_pipeline/lib/model/environment.py index c629521f5..d95d7a27c 100644 --- a/v03_pipeline/lib/model/environment.py +++ b/v03_pipeline/lib/model/environment.py @@ -1,5 +1,6 @@ import os from dataclasses import dataclass +from typing import Literal # NB: using os.environ.get inside the dataclass defaults gives a lint error. GRCH37_TO_GRCH38_LIFTOVER_REF_PATH = os.environ.get( @@ -41,6 +42,14 @@ 'CLINGEN_ALLELE_REGISTRY_PASSWORD', '', ) +DEPLOYMENT_TYPE = os.environ.get('DEPLOYMENT_TYPE', 'prod') +GCLOUD_DATAPROC_SECONDARY_WORKERS = int( + os.environ.get('GCLOUD_DATAPROC_SECONDARY_WORKERS', '5'), +) +GCLOUD_PROJECT = os.environ.get('GCLOUD_PROJECT') +GCLOUD_ZONE = os.environ.get('GCLOUD_ZONE') +GCLOUD_REGION = os.environ.get('GCLOUD_REGION') +PIPELINE_RUNNER_APP_VERSION = os.environ.get('PIPELINE_RUNNER_APP_VERSION', 'latest') # Feature Flags ACCESS_PRIVATE_REFERENCE_DATASETS = ( @@ -62,7 +71,12 @@ class Env: CHECK_SEX_AND_RELATEDNESS: bool = CHECK_SEX_AND_RELATEDNESS CLINGEN_ALLELE_REGISTRY_LOGIN: str | None = CLINGEN_ALLELE_REGISTRY_LOGIN CLINGEN_ALLELE_REGISTRY_PASSWORD: str | None = CLINGEN_ALLELE_REGISTRY_PASSWORD + DEPLOYMENT_TYPE: Literal['dev', 'prod'] = DEPLOYMENT_TYPE EXPECT_WES_FILTERS: bool = EXPECT_WES_FILTERS + GCLOUD_DATAPROC_SECONDARY_WORKERS: str = GCLOUD_DATAPROC_SECONDARY_WORKERS + GCLOUD_PROJECT: str | None = GCLOUD_PROJECT + GCLOUD_ZONE: str | None = GCLOUD_ZONE + GCLOUD_REGION: str | None = GCLOUD_REGION GRCH37_TO_GRCH38_LIFTOVER_REF_PATH: str = GRCH37_TO_GRCH38_LIFTOVER_REF_PATH GRCH38_TO_GRCH37_LIFTOVER_REF_PATH: str = GRCH38_TO_GRCH37_LIFTOVER_REF_PATH HAIL_BACKEND_SERVICE_HOSTNAME: str | None = HAIL_BACKEND_SERVICE_HOSTNAME @@ -71,6 +85,7 @@ class Env: HAIL_SEARCH_DATA_DIR: str = HAIL_SEARCH_DATA_DIR INCLUDE_PIPELINE_VERSION_IN_PREFIX: bool = INCLUDE_PIPELINE_VERSION_IN_PREFIX LOADING_DATASETS_DIR: str = LOADING_DATASETS_DIR + PIPELINE_RUNNER_APP_VERSION: str = PIPELINE_RUNNER_APP_VERSION PRIVATE_REFERENCE_DATASETS_DIR: str = PRIVATE_REFERENCE_DATASETS_DIR REFERENCE_DATASETS_DIR: str = REFERENCE_DATASETS_DIR SHOULD_TRIGGER_HAIL_BACKEND_RELOAD: bool = SHOULD_TRIGGER_HAIL_BACKEND_RELOAD diff --git a/v03_pipeline/lib/tasks/dataproc/__init__.py b/v03_pipeline/lib/tasks/dataproc/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/v03_pipeline/lib/tasks/dataproc/create_dataproc_cluster.py b/v03_pipeline/lib/tasks/dataproc/create_dataproc_cluster.py new file mode 100644 index 000000000..3a1d6c9da --- /dev/null +++ b/v03_pipeline/lib/tasks/dataproc/create_dataproc_cluster.py @@ -0,0 +1,168 @@ +import time + +import hail as hl +import luigi +from google.cloud import dataproc_v1 as dataproc +from pip._internal.operations import freeze as pip_freeze + +from v03_pipeline.lib.logger import get_logger +from v03_pipeline.lib.model import Env, ReferenceGenome +from v03_pipeline.lib.tasks.base.base_loading_pipeline_params import ( + BaseLoadingPipelineParams, +) + +CLUSTER_NAME_PREFIX = 'pipeline-runner' +DEBIAN_IMAGE = '2.1.33-debian11' +HAIL_VERSION = hl.version().split('-')[0] +INSTANCE_TYPE = 'n1-highmem-8' +PKGS = '|'.join(pip_freeze.freeze()) +SUCCESS_STATE = 'RUNNING' + +logger = get_logger(__name__) + + +def get_cluster_config(reference_genome: ReferenceGenome, run_id: str): + return { + 'project_id': Env.GCLOUD_PROJECT, + 'cluster_name': f'{CLUSTER_NAME_PREFIX}-{reference_genome.value.lower()}-{run_id}', + 'config': { + 'gce_cluster_config': { + 'zone_uri': Env.GCLOUD_ZONE, + 'metadata': { + 'WHEEL': f'gs://hail-common/hailctl/dataproc/{HAIL_VERSION}/hail-{HAIL_VERSION}-py3-none-any.whl', + 'PKGS': PKGS, + 'DEPLOYMENT_TYPE': Env.DEPLOYMENT_TYPE, + 'REFERENCE_GENOME': reference_genome.value, + 'PIPELINE_RUNNER_APP_VERSION': Env.PIPELINE_RUNNER_APP_VERSION, + }, + }, + 'master_config': { + 'num_instances': 1, + 'machine_type_uri': INSTANCE_TYPE, + 'disk_config': { + 'boot_disk_type': 'pd-standard', + 'boot_disk_size_gb': 100, + }, + }, + 'worker_config': { + 'num_instances': 2, + 'machine_type_uri': INSTANCE_TYPE, + 'disk_config': { + 'boot_disk_type': 'pd-standard', + 'boot_disk_size_gb': 100, + }, + }, + 'secondary_worker_config': { + 'num_instances': Env.GCLOUD_DATAPROC_SECONDARY_WORKERS, + 'machine_type_uri': INSTANCE_TYPE, + 'disk_config': { + 'boot_disk_type': 'pd-standard', + 'boot_disk_size_gb': 100, + }, + 'is_preemptible': True, + 'preemptibility': 'PREEMPTIBLE', + }, + 'software_config': { + 'image_version': DEBIAN_IMAGE, + 'properties': { + 'spark:spark.driver.maxResultSize': '0', + 'spark:spark.task.maxFailures': '20', + 'spark:spark.kryoserializer.buffer.max': '2g', + 'spark:spark.driver.extraJavaOptions': '-Xss16M', + 'spark:spark.executor.extraJavaOptions': '-Xss16M', + 'hdfs:dfs.replication': '1', + 'dataproc:dataproc.logging.stackdriver.enable': 'false', + 'dataproc:dataproc.monitoring.stackdriver.enable': 'false', + 'spark:spark.driver.memory': '41g', + 'yarn:yarn.nodemanager.resource.memory-mb': '50585', + 'yarn:yarn.scheduler.maximum-allocation-mb': '25292', + 'spark:spark.executor.cores': '4', + 'spark:spark.executor.memory': '10117m', + 'spark:spark.executor.memoryOverhead': '15175m', + 'spark:spark.memory.storageFraction': '0.2', + 'spark:spark.executorEnv.HAIL_WORKER_OFF_HEAP_MEMORY_PER_CORE_MB': '6323', + 'spark:spark.speculation': 'true', + 'spark-env:ACCESS_PRIVATE_REFERENCE_DATASETS': '1' + if Env.ACCESS_PRIVATE_REFERENCE_DATASETS + else '0', + 'spark-env:CHECK_SEX_AND_RELATEDNESS': '1' + if Env.CHECK_SEX_AND_RELATEDNESS + else '0', + 'spark-env:EXPECT_WES_FILTERS': '1' + if Env.EXPECT_WES_FILTERS + else '0', + 'spark-env:HAIL_SEARCH_DATA_DIR': Env.HAIL_SEARCH_DATA_DIR, + 'spark-env:HAIL_TMP_DIR': Env.HAIL_TMP_DIR, + 'spark-env:INCLUDE_PIPELINE_VERSION_IN_PREFIX': '1' + if Env.INCLUDE_PIPELINE_VERSION_IN_PREFIX + else '0', + 'spark-env:LOADING_DATASETS_DIR': Env.LOADING_DATASETS_DIR, + 'spark-env:PRIVATE_REFERENCE_DATASETS_DIR': Env.PRIVATE_REFERENCE_DATASETS_DIR, + 'spark-env:REFERENCE_DATASETS_DIR': Env.REFERENCE_DATASETS_DIR, + 'spark-env:CLINGEN_ALLELE_REGISTRY_LOGIN': Env.CLINGEN_ALLELE_REGISTRY_LOGIN, + 'spark-env:CLINGEN_ALLELE_REGISTRY_PASSWORD': Env.CLINGEN_ALLELE_REGISTRY_PASSWORD, + }, + }, + 'lifecycle_config': {'idle_delete_ttl': {'seconds': 1200}}, + 'encryption_config': {}, + 'autoscaling_config': {}, + 'endpoint_config': {}, + 'initialization_actions': [ + { + 'executable_file': f'gs://seqr-pipeline-runner-builds/{Env.DEPLOYMENT_TYPE}/{Env.PIPELINE_RUNNER_APP_VERSION}/bin/dataproc_vep_init.bash', + 'execution_timeout': {'seconds': 1200}, + }, + ], + }, + } + + +@luigi.util.inherits(BaseLoadingPipelineParams) +class CreateDataprocClusterTask(luigi.Task): + # NB: The luigi.dataproc.contrib module was old and bad + # so we built our own shim. + run_id = luigi.Parameter() + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + # https://cloud.google.com/dataproc/docs/tutorials/python-library-example + self.client = dataproc.ClusterControllerClient( + client_options={ + 'api_endpoint': f'{Env.GCLOUD_REGION}-dataproc.googleapis.com:443'.format( + Env.GCLOUD_REGION, + ), + }, + ) + + def complete(self) -> bool: + if not self.dataset_type.requires_dataproc: + return True + try: + client = self.client.get_cluster( + request={ + 'project_id': Env.GCLOUD_PROJECT, + 'region': Env.GCLOUD_REGION, + 'cluster_name': f'{CLUSTER_NAME_PREFIX}-{self.reference_genome.value.lower()}', + }, + ) + except Exception: # noqa: BLE001 + return False + else: + return client.status.state == SUCCESS_STATE + + def run(self): + operation = self.client.create_cluster( + request={ + 'project_id': Env.GCLOUD_PROJECT, + 'region': Env.GCLOUD_REGION, + 'cluster': get_cluster_config(self.reference_genome, self.run_id), + }, + ) + while True: + if operation.done(): + result = operation.result() # Will throw on failure! + msg = f'Created cluster {result.cluster_name} with cluster uuid: {result.cluster_uuid}' + logger.info(msg) + break + logger.info('Waiting for cluster spinup') + time.sleep(3) diff --git a/v03_pipeline/lib/tasks/dataproc/create_dataproc_cluster_test.py b/v03_pipeline/lib/tasks/dataproc/create_dataproc_cluster_test.py new file mode 100644 index 000000000..bb447e0d6 --- /dev/null +++ b/v03_pipeline/lib/tasks/dataproc/create_dataproc_cluster_test.py @@ -0,0 +1,125 @@ +import unittest +from types import SimpleNamespace +from unittest.mock import Mock, call, patch + +import google.api_core.exceptions +import luigi + +from v03_pipeline.lib.model import DatasetType, ReferenceGenome +from v03_pipeline.lib.tasks.dataproc.create_dataproc_cluster import ( + CreateDataprocClusterTask, +) + + +@patch( + 'v03_pipeline.lib.tasks.dataproc.create_dataproc_cluster.dataproc.ClusterControllerClient', +) +class CreateDataprocClusterTaskTest(unittest.TestCase): + def test_dataset_type_unsupported(self, mock_cluster_controller: Mock) -> None: + worker = luigi.worker.Worker() + task = CreateDataprocClusterTask( + reference_genome=ReferenceGenome.GRCh38, + dataset_type=DatasetType.MITO, + run_id='1', + ) + worker.add(task) + worker.run() + self.assertTrue(task.complete()) + + def test_spinup_cluster_already_exists_failed( + self, + mock_cluster_controller: Mock, + ) -> None: + mock_client = mock_cluster_controller.return_value + mock_client.get_cluster.return_value = SimpleNamespace( + status=SimpleNamespace(state='FAILED'), + ) + mock_client.create_cluster.side_effect = ( + google.api_core.exceptions.AlreadyExists('cluster exists') + ) + worker = luigi.worker.Worker() + task = CreateDataprocClusterTask( + reference_genome=ReferenceGenome.GRCh38, + dataset_type=DatasetType.SNV_INDEL, + run_id='2', + ) + worker.add(task) + worker.run() + self.assertFalse(task.complete()) + + def test_spinup_cluster_already_exists_success( + self, + mock_cluster_controller: Mock, + ) -> None: + mock_client = mock_cluster_controller.return_value + mock_client.get_cluster.return_value = SimpleNamespace( + status=SimpleNamespace(state='RUNNING'), + ) + mock_client.create_cluster.side_effect = ( + google.api_core.exceptions.AlreadyExists('cluster exists') + ) + worker = luigi.worker.Worker() + task = CreateDataprocClusterTask( + reference_genome=ReferenceGenome.GRCh38, + dataset_type=DatasetType.SNV_INDEL, + run_id='3', + ) + worker.add(task) + worker.run() + self.assertTrue(task.complete()) + + @patch('v03_pipeline.lib.tasks.dataproc.create_dataproc_cluster.logger') + def test_spinup_cluster_doesnt_exist_failed( + self, + mock_logger: Mock, + mock_cluster_controller: Mock, + ) -> None: + mock_client = mock_cluster_controller.return_value + mock_client.get_cluster.side_effect = google.api_core.exceptions.NotFound( + 'cluster not found', + ) + operation = mock_client.create_cluster.return_value + operation.done.side_effect = [False, True] + operation.result.side_effect = Exception('SpinupFailed') + + worker = luigi.worker.Worker() + task = CreateDataprocClusterTask( + reference_genome=ReferenceGenome.GRCh38, + dataset_type=DatasetType.SNV_INDEL, + run_id='4', + ) + worker.add(task) + worker.run() + self.assertFalse(task.complete()) + mock_logger.info.assert_has_calls([call('Waiting for cluster spinup')]) + + @patch('v03_pipeline.lib.tasks.dataproc.create_dataproc_cluster.logger') + def test_spinup_cluster_doesnt_exist_success( + self, + mock_logger: Mock, + mock_cluster_controller: Mock, + ) -> None: + mock_client = mock_cluster_controller.return_value + mock_client.get_cluster.side_effect = google.api_core.exceptions.NotFound( + 'cluster not found', + ) + operation = mock_client.create_cluster.return_value + operation.done.side_effect = [False, True] + operation.result.return_value = SimpleNamespace( + cluster_name='dataproc-cluster-1', + cluster_uuid='12345', + ) + worker = luigi.worker.Worker() + task = CreateDataprocClusterTask( + reference_genome=ReferenceGenome.GRCh38, + dataset_type=DatasetType.SNV_INDEL, + run_id='5', + ) + worker.add(task) + worker.run() + mock_logger.info.assert_has_calls( + [ + call('Waiting for cluster spinup'), + call('Created cluster dataproc-cluster-1 with cluster uuid: 12345'), + ], + )