From 533d1d4569bc69a958d6d7ec489aa8a36a9c1279 Mon Sep 17 00:00:00 2001 From: Benjamin Blankenmeister Date: Fri, 13 Dec 2024 09:41:27 -0500 Subject: [PATCH 1/3] Add service account credentialing (#997) * Add service account credentialing * ruff --- v03_pipeline/lib/misc/gcp.py | 36 +++++++++++++++++++ .../tasks/dataproc/create_dataproc_cluster.py | 5 +++ .../dataproc/create_dataproc_cluster_test.py | 18 +++++++++- 3 files changed, 58 insertions(+), 1 deletion(-) create mode 100644 v03_pipeline/lib/misc/gcp.py diff --git a/v03_pipeline/lib/misc/gcp.py b/v03_pipeline/lib/misc/gcp.py new file mode 100644 index 000000000..c22113bf1 --- /dev/null +++ b/v03_pipeline/lib/misc/gcp.py @@ -0,0 +1,36 @@ +import datetime + +import google.auth +import google.auth.transport.requests +import google.oauth2.credentials +import pytz + +SERVICE_ACCOUNT_CREDENTIALS = None +SOCIAL_AUTH_GOOGLE_OAUTH2_SCOPE = [ + 'https://www.googleapis.com/auth/userinfo.profile', + 'https://www.googleapis.com/auth/userinfo.email', + 'openid', +] +ONE_MINUTE_S = 60 + + +def get_service_account_credentials() -> google.oauth2.credentials.Credentials: + global SERVICE_ACCOUNT_CREDENTIALS + if not SERVICE_ACCOUNT_CREDENTIALS: + SERVICE_ACCOUNT_CREDENTIALS, _ = google.auth.default( + scopes=SOCIAL_AUTH_GOOGLE_OAUTH2_SCOPE, + ) + tz = pytz.UTC + if ( + SERVICE_ACCOUNT_CREDENTIALS.token + and ( + tz.localize(SERVICE_ACCOUNT_CREDENTIALS.expiry) + - datetime.datetime.now(tz=tz) + ).total_seconds() + > ONE_MINUTE_S + ): + return SERVICE_ACCOUNT_CREDENTIALS + SERVICE_ACCOUNT_CREDENTIALS.refresh( + request=google.auth.transport.requests.Request(), + ) + return SERVICE_ACCOUNT_CREDENTIALS diff --git a/v03_pipeline/lib/tasks/dataproc/create_dataproc_cluster.py b/v03_pipeline/lib/tasks/dataproc/create_dataproc_cluster.py index 3a1d6c9da..f7d0bc21d 100644 --- a/v03_pipeline/lib/tasks/dataproc/create_dataproc_cluster.py +++ b/v03_pipeline/lib/tasks/dataproc/create_dataproc_cluster.py @@ -6,6 +6,7 @@ from pip._internal.operations import freeze as pip_freeze from v03_pipeline.lib.logger import get_logger +from v03_pipeline.lib.misc.gcp import get_service_account_credentials from v03_pipeline.lib.model import Env, ReferenceGenome from v03_pipeline.lib.tasks.base.base_loading_pipeline_params import ( BaseLoadingPipelineParams, @@ -22,9 +23,11 @@ def get_cluster_config(reference_genome: ReferenceGenome, run_id: str): + service_account_credentials = get_service_account_credentials() return { 'project_id': Env.GCLOUD_PROJECT, 'cluster_name': f'{CLUSTER_NAME_PREFIX}-{reference_genome.value.lower()}-{run_id}', + # Schema found at https://cloud.google.com/dataproc/docs/reference/rest/v1/ClusterConfig 'config': { 'gce_cluster_config': { 'zone_uri': Env.GCLOUD_ZONE, @@ -35,6 +38,8 @@ def get_cluster_config(reference_genome: ReferenceGenome, run_id: str): 'REFERENCE_GENOME': reference_genome.value, 'PIPELINE_RUNNER_APP_VERSION': Env.PIPELINE_RUNNER_APP_VERSION, }, + 'service_account': service_account_credentials.service_account_email, + 'service_account_scopes': service_account_credentials.scopes, }, 'master_config': { 'num_instances': 1, diff --git a/v03_pipeline/lib/tasks/dataproc/create_dataproc_cluster_test.py b/v03_pipeline/lib/tasks/dataproc/create_dataproc_cluster_test.py index bb447e0d6..c7f4f2958 100644 --- a/v03_pipeline/lib/tasks/dataproc/create_dataproc_cluster_test.py +++ b/v03_pipeline/lib/tasks/dataproc/create_dataproc_cluster_test.py @@ -5,17 +5,29 @@ import google.api_core.exceptions import luigi +from v03_pipeline.lib.misc.gcp import SOCIAL_AUTH_GOOGLE_OAUTH2_SCOPE from v03_pipeline.lib.model import DatasetType, ReferenceGenome from v03_pipeline.lib.tasks.dataproc.create_dataproc_cluster import ( CreateDataprocClusterTask, ) +@patch( + 'v03_pipeline.lib.tasks.dataproc.create_dataproc_cluster.get_service_account_credentials', + return_value=SimpleNamespace( + service_account_email='test@serviceaccount.com', + scopes=SOCIAL_AUTH_GOOGLE_OAUTH2_SCOPE, + ), +) @patch( 'v03_pipeline.lib.tasks.dataproc.create_dataproc_cluster.dataproc.ClusterControllerClient', ) class CreateDataprocClusterTaskTest(unittest.TestCase): - def test_dataset_type_unsupported(self, mock_cluster_controller: Mock) -> None: + def test_dataset_type_unsupported( + self, + mock_cluster_controller: Mock, + _: Mock, + ) -> None: worker = luigi.worker.Worker() task = CreateDataprocClusterTask( reference_genome=ReferenceGenome.GRCh38, @@ -29,6 +41,7 @@ def test_dataset_type_unsupported(self, mock_cluster_controller: Mock) -> None: def test_spinup_cluster_already_exists_failed( self, mock_cluster_controller: Mock, + _: Mock, ) -> None: mock_client = mock_cluster_controller.return_value mock_client.get_cluster.return_value = SimpleNamespace( @@ -50,6 +63,7 @@ def test_spinup_cluster_already_exists_failed( def test_spinup_cluster_already_exists_success( self, mock_cluster_controller: Mock, + _: Mock, ) -> None: mock_client = mock_cluster_controller.return_value mock_client.get_cluster.return_value = SimpleNamespace( @@ -73,6 +87,7 @@ def test_spinup_cluster_doesnt_exist_failed( self, mock_logger: Mock, mock_cluster_controller: Mock, + _: Mock, ) -> None: mock_client = mock_cluster_controller.return_value mock_client.get_cluster.side_effect = google.api_core.exceptions.NotFound( @@ -98,6 +113,7 @@ def test_spinup_cluster_doesnt_exist_success( self, mock_logger: Mock, mock_cluster_controller: Mock, + _: Mock, ) -> None: mock_client = mock_cluster_controller.return_value mock_client.get_cluster.side_effect = google.api_core.exceptions.NotFound( From b98fe664c8733e8edb0faf39ff042827a362a93d Mon Sep 17 00:00:00 2001 From: Benjamin Blankenmeister Date: Fri, 13 Dec 2024 11:13:20 -0500 Subject: [PATCH 2/3] feat: Handle parsing empty predicted sex into Unknown (#1000) --- v03_pipeline/lib/misc/io.py | 6 +++++- v03_pipeline/lib/misc/io_test.py | 1 + v03_pipeline/lib/model/definitions.py | 10 +++++----- v03_pipeline/var/test/sex_check/test_imputed_sex.tsv | 1 + 4 files changed, 12 insertions(+), 6 deletions(-) diff --git a/v03_pipeline/lib/misc/io.py b/v03_pipeline/lib/misc/io.py index 865cdf5b3..cc31ffba1 100644 --- a/v03_pipeline/lib/misc/io.py +++ b/v03_pipeline/lib/misc/io.py @@ -219,7 +219,11 @@ def select_relevant_fields( def import_imputed_sex(imputed_sex_path: str) -> hl.Table: ht = hl.import_table(imputed_sex_path) imputed_sex_lookup = hl.dict( - {s.imputed_sex_value: s.value for s in Sex}, + { + imputed_sex_value: s.value + for s in Sex + for imputed_sex_value in s.imputed_sex_values + }, ) ht = ht.select( s=ht.collaborator_sample_id, diff --git a/v03_pipeline/lib/misc/io_test.py b/v03_pipeline/lib/misc/io_test.py index f392c5c76..c9b4f3bce 100644 --- a/v03_pipeline/lib/misc/io_test.py +++ b/v03_pipeline/lib/misc/io_test.py @@ -47,6 +47,7 @@ def test_import_imputed_sex(self) -> None: hl.Struct(s='abc_2', predicted_sex='F'), hl.Struct(s='abc_3', predicted_sex='U'), hl.Struct(s='abc_4', predicted_sex='XYY'), + hl.Struct(s='abc_5', predicted_sex='U'), ], ) diff --git a/v03_pipeline/lib/model/definitions.py b/v03_pipeline/lib/model/definitions.py index a87437c8f..09b10fcbd 100644 --- a/v03_pipeline/lib/model/definitions.py +++ b/v03_pipeline/lib/model/definitions.py @@ -18,12 +18,12 @@ class Sex(str, Enum): XYY = 'XYY' @property - def imputed_sex_value(self): + def imputed_sex_values(self) -> list[str]: return { - Sex.MALE: 'Male', - Sex.FEMALE: 'Female', - Sex.UNKNOWN: 'Unknown', - }.get(self, self.name) + Sex.MALE: ['Male'], + Sex.FEMALE: ['Female'], + Sex.UNKNOWN: ['', 'Unknown'], + }.get(self, [self.name]) class PipelineVersion(str, Enum): diff --git a/v03_pipeline/var/test/sex_check/test_imputed_sex.tsv b/v03_pipeline/var/test/sex_check/test_imputed_sex.tsv index a22b0e26a..f0ae98e6c 100644 --- a/v03_pipeline/var/test/sex_check/test_imputed_sex.tsv +++ b/v03_pipeline/var/test/sex_check/test_imputed_sex.tsv @@ -3,3 +3,4 @@ SM-DM66X abc_1 abc_1 0E+00 gs://datarepo-9cafeffd-bucket/f511b131-3f0d-4eb7-a7f0 SM-DM69X abc_2 abc_2 0E+00 gs://datarepo-556a9c15-bucket/2a4202b0-93f5-4ebe-8d2b-fd4cfb2b881d/c4c07edf-7735-4aa7-9283-7cb2607b60a2/GLE-5774-3-3.qc-coverage-region-1_coverage_metrics.csv gs://datarepo-556a9c15-bucket/2a4202b0-93f5-4ebe-8d2b-fd4cfb2b881d/dcd4c271-0249-47f1-8e91-81f74735c5a1/GLE-5774-3-3.cram.crai gs://datarepo-556a9c15-bucket/2a4202b0-93f5-4ebe-8d2b-fd4cfb2b881d/ec41ec06-673f-4fe2-a063-23dc5fe1dcce/GLE-5774-3-3.cram.md5sum gs://datarepo-556a9c15-bucket/2a4202b0-93f5-4ebe-8d2b-fd4cfb2b881d/aad0e270-2ad5-4f39-b968-9b4beafeb5cc/GLE-5774-3-3.cram a4b04a39-9234-4028-a155-442c4acf12a0 07.021.604.3.7.8 ce74d94c-c33d-49d7-85c9-5f3cbd08aff7 2024-04-17T15:02:46 99.800000000 gs://datarepo-556a9c15-bucket/2a4202b0-93f5-4ebe-8d2b-fd4cfb2b881d/c3a9e6f2-4c68-410b-823d-46ca406e5061/GLE-5774-3-3.mapping_metrics.csv DNA:DNA Genomic 35.300000000 Whole Blood:Whole Blood PT-24OHM Pass PDO-32755 96.320000000 97.340000000 Female P-WG-0139 2017-04-12 04:00:00 Female RP-3061 gs://datarepo-556a9c15-bucket/2a4202b0-93f5-4ebe-8d2b-fd4cfb2b881d/c71cd2a1-c789-4715-9ebc-dbfc40d9f2e2/GLE-5774-3-3.vcf.gz.tbi gs://datarepo-556a9c15-bucket/2a4202b0-93f5-4ebe-8d2b-fd4cfb2b881d/957a99cb-c9a9-4fc5-a0ec-53f9e461469e/GLE-5774-3-3.vcf.gz.md5sum gs://datarepo-556a9c15-bucket/2a4202b0-93f5-4ebe-8d2b-fd4cfb2b881d/df520949-5f2b-4976-9d46-80d1cc299813/GLE-5774-3-3.vcf.gz 133253714921.000000000 gs://datarepo-556a9c15-bucket/2a4202b0-93f5-4ebe-8d2b-fd4cfb2b881d/2e98e51b-9394-4e64-977f-e9010a4e16dc/GLE-5774-3-3.vc_metrics.csv SM-DPB5G abc_3 abc_3 0E+00 gs://datarepo-c41dc160-bucket/907593be-8862-4945-9e70-f758b6448b8d/432f8354-77e0-4381-9bb5-dfdc0633b5b2/PIE_OGI1433_002628_1.qc-coverage-region-1_coverage_metrics.csv gs://datarepo-c41dc160-bucket/907593be-8862-4945-9e70-f758b6448b8d/3dc623fa-2a45-4b3d-a0f8-fcdec09f9418/PIE_OGI1433_002628_1.cram.crai gs://datarepo-c41dc160-bucket/907593be-8862-4945-9e70-f758b6448b8d/895966ef-c705-4c18-952d-03863243a184/PIE_OGI1433_002628_1.cram.md5sum gs://datarepo-c41dc160-bucket/907593be-8862-4945-9e70-f758b6448b8d/96ca6d5f-fb23-4102-bb5e-c7bbfd194e1c/PIE_OGI1433_002628_1.cram ffb50687-165e-425a-a545-c3797d3a28d4 07.021.604.3.7.8 55729ba9-3ce4-47b3-9c3b-1148737ae40f 2024-04-17T15:07:57 99.670000000 gs://datarepo-c41dc160-bucket/907593be-8862-4945-9e70-f758b6448b8d/30f8e208-5d2d-4ce8-b835-695b5ed673f4/PIE_OGI1433_002628_1.mapping_metrics.csv DNA:DNA Genomic 41.910000000 Whole Blood:Whole Blood PT-25BR5 Pass PDO-32756 92.920000000 97.990000000 Unknown P-WG-0139 2017-05-19 04:00:00 Unknown RP-3062 gs://datarepo-c41dc160-bucket/907593be-8862-4945-9e70-f758b6448b8d/1641d1b2-1035-4cc3-9c8b-0c8cb430f56b/PIE_OGI1433_002628_1.vcf.gz.tbi gs://datarepo-c41dc160-bucket/907593be-8862-4945-9e70-f758b6448b8d/f5ba2708-899e-42e8-b287-fdf72c2e404d/PIE_OGI1433_002628_1.vcf.gz.md5sum gs://datarepo-c41dc160-bucket/907593be-8862-4945-9e70-f758b6448b8d/e925ee5d-a75e-471f-adfd-2756c8690069/PIE_OGI1433_002628_1.vcf.gz 156149580126.000000000 gs://datarepo-c41dc160-bucket/907593be-8862-4945-9e70-f758b6448b8d/df076bc5-9db8-44f0-a3fe-f693370634cc/PIE_OGI1433_002628_1.vc_metrics.csv SM-DPB5G abc_4 abc_4 0E+00 gs://datarepo-c41dc160-bucket/907593be-8862-4945-9e70-f758b6448b8d/432f8354-77e0-4381-9bb5-dfdc0633b5b2/PIE_OGI1433_002628_1.qc-coverage-region-1_coverage_metrics.csv gs://datarepo-c41dc160-bucket/907593be-8862-4945-9e70-f758b6448b8d/3dc623fa-2a45-4b3d-a0f8-fcdec09f9418/PIE_OGI1433_002628_1.cram.crai gs://datarepo-c41dc160-bucket/907593be-8862-4945-9e70-f758b6448b8d/895966ef-c705-4c18-952d-03863243a184/PIE_OGI1433_002628_1.cram.md5sum gs://datarepo-c41dc160-bucket/907593be-8862-4945-9e70-f758b6448b8d/96ca6d5f-fb23-4102-bb5e-c7bbfd194e1c/PIE_OGI1433_002628_1.cram ffb50687-165e-425a-a545-c3797d3a28d4 07.021.604.3.7.8 55729ba9-3ce4-47b3-9c3b-1148737ae40f 2024-04-17T15:07:57 99.670000000 gs://datarepo-c41dc160-bucket/907593be-8862-4945-9e70-f758b6448b8d/30f8e208-5d2d-4ce8-b835-695b5ed673f4/PIE_OGI1433_002628_1.mapping_metrics.csv DNA:DNA Genomic 41.910000000 Whole Blood:Whole Blood PT-25BR5 Pass PDO-32756 92.920000000 97.990000000 XYY P-WG-0139 2017-05-19 04:00:00 XYY RP-3062 gs://datarepo-c41dc160-bucket/907593be-8862-4945-9e70-f758b6448b8d/1641d1b2-1035-4cc3-9c8b-0c8cb430f56b/PIE_OGI1433_002628_1.vcf.gz.tbi gs://datarepo-c41dc160-bucket/907593be-8862-4945-9e70-f758b6448b8d/f5ba2708-899e-42e8-b287-fdf72c2e404d/PIE_OGI1433_002628_1.vcf.gz.md5sum gs://datarepo-c41dc160-bucket/907593be-8862-4945-9e70-f758b6448b8d/e925ee5d-a75e-471f-adfd-2756c8690069/PIE_OGI1433_002628_1.vcf.gz 156149580126.000000000 gs://datarepo-c41dc160-bucket/907593be-8862-4945-9e70-f758b6448b8d/df076bc5-9db8-44f0-a3fe-f693370634cc/PIE_OGI1433_002628_1.vc_metrics.csv +SM-DPB5G abc_5 abc_5 0E+00 gs://datarepo-c41dc160-bucket/907593be-8862-4945-9e70-f758b6448b8d/432f8354-77e0-4381-9bb5-dfdc0633b5b2/PIE_OGI1433_002628_1.qc-coverage-region-1_coverage_metrics.csv gs://datarepo-c41dc160-bucket/907593be-8862-4945-9e70-f758b6448b8d/3dc623fa-2a45-4b3d-a0f8-fcdec09f9418/PIE_OGI1433_002628_1.cram.crai gs://datarepo-c41dc160-bucket/907593be-8862-4945-9e70-f758b6448b8d/895966ef-c705-4c18-952d-03863243a184/PIE_OGI1433_002628_1.cram.md5sum gs://datarepo-c41dc160-bucket/907593be-8862-4945-9e70-f758b6448b8d/96ca6d5f-fb23-4102-bb5e-c7bbfd194e1c/PIE_OGI1433_002628_1.cram ffb50687-165e-425a-a545-c3797d3a28d4 07.021.604.3.7.8 55729ba9-3ce4-47b3-9c3b-1148737ae40f 2024-04-17T15:07:57 99.670000000 gs://datarepo-c41dc160-bucket/907593be-8862-4945-9e70-f758b6448b8d/30f8e208-5d2d-4ce8-b835-695b5ed673f4/PIE_OGI1433_002628_1.mapping_metrics.csv DNA:DNA Genomic 41.910000000 Whole Blood:Whole Blood PT-25BR5 Pass PDO-32756 92.920000000 97.990000000 P-WG-0139 2017-05-19 04:00:00 RP-3062 gs://datarepo-c41dc160-bucket/907593be-8862-4945-9e70-f758b6448b8d/1641d1b2-1035-4cc3-9c8b-0c8cb430f56b/PIE_OGI1433_002628_1.vcf.gz.tbi gs://datarepo-c41dc160-bucket/907593be-8862-4945-9e70-f758b6448b8d/f5ba2708-899e-42e8-b287-fdf72c2e404d/PIE_OGI1433_002628_1.vcf.gz.md5sum gs://datarepo-c41dc160-bucket/907593be-8862-4945-9e70-f758b6448b8d/e925ee5d-a75e-471f-adfd-2756c8690069/PIE_OGI1433_002628_1.vcf.gz 156149580126.000000000 gs://datarepo-c41dc160-bucket/907593be-8862-4945-9e70-f758b6448b8d/df076bc5-9db8-44f0-a3fe-f693370634cc/PIE_OGI1433_002628_1.vc_metrics.csv From 8b58c01aa462f9b6ad35bc29954f124c03c26e67 Mon Sep 17 00:00:00 2001 From: Benjamin Blankenmeister Date: Fri, 13 Dec 2024 11:48:37 -0500 Subject: [PATCH 3/3] Add helper functions for querying `Terra Data Repository` (#998) * Add service account credentialing * ruff * First pass * tests passing * add coverage of bigquery test * change function names * use generators everywhere * bq requirement * resolver * Update sample id name * Build Sex Check Table from TDR Metrics (#999) --- requirements.in | 1 + requirements.txt | 22 +- v03_pipeline/lib/misc/allele_registry.py | 10 +- v03_pipeline/lib/misc/requests.py | 14 + .../lib/misc/terra_data_repository.py | 68 ++++ .../lib/misc/terra_data_repository_test.py | 303 ++++++++++++++++++ v03_pipeline/lib/model/dataset_type.py | 8 + v03_pipeline/lib/model/environment.py | 2 + v03_pipeline/lib/paths.py | 17 +- v03_pipeline/lib/paths_test.py | 10 +- .../lib/tasks/base/base_loading_run_params.py | 4 + .../lib/tasks/write_imported_callset.py | 12 + .../lib/tasks/write_sex_check_table.py | 24 +- .../lib/tasks/write_sex_check_table_test.py | 96 ++++++ .../lib/tasks/write_tdr_metrics_file.py | 35 ++ .../lib/tasks/write_tdr_metrics_files.py | 28 ++ 16 files changed, 623 insertions(+), 31 deletions(-) create mode 100644 v03_pipeline/lib/misc/requests.py create mode 100644 v03_pipeline/lib/misc/terra_data_repository.py create mode 100644 v03_pipeline/lib/misc/terra_data_repository_test.py create mode 100644 v03_pipeline/lib/tasks/write_sex_check_table_test.py create mode 100644 v03_pipeline/lib/tasks/write_tdr_metrics_file.py create mode 100644 v03_pipeline/lib/tasks/write_tdr_metrics_files.py diff --git a/requirements.in b/requirements.in index 6d5abb8b9..717740a88 100644 --- a/requirements.in +++ b/requirements.in @@ -4,3 +4,4 @@ gnomad==0.6.4 aiofiles==24.1.0 pydantic==2.8.2 google-cloud-dataproc==5.14.0 +google-cloud-bigquery==3.27.0 diff --git a/requirements.txt b/requirements.txt index 9762fbca9..86e9f058b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -97,17 +97,30 @@ frozenlist==1.5.0 gnomad==0.6.4 # via -r requirements.in google-api-core[grpc]==2.22.0 - # via google-cloud-dataproc + # via + # google-cloud-bigquery + # google-cloud-core + # google-cloud-dataproc google-auth==2.35.0 # via # google-api-core # google-auth-oauthlib + # google-cloud-bigquery + # google-cloud-core # google-cloud-dataproc # hail google-auth-oauthlib==0.8.0 # via hail +google-cloud-bigquery==3.27.0 + # via -r requirements.in +google-cloud-core==2.4.1 + # via google-cloud-bigquery google-cloud-dataproc==5.14.0 # via -r requirements.in +google-crc32c==1.6.0 + # via google-resumable-media +google-resumable-media==2.7.2 + # via google-cloud-bigquery googleapis-common-protos[grpc]==1.65.0 # via # google-api-core @@ -197,6 +210,7 @@ orjson==3.10.10 packaging==24.1 # via # bokeh + # google-cloud-bigquery # plotly pandas==2.2.3 # via @@ -256,9 +270,7 @@ pygments==2.18.0 # ipython # rich pyjwt[crypto]==2.9.0 - # via - # msal - # pyjwt + # via msal pyspark==3.5.3 # via hail python-daemon==3.1.0 @@ -266,6 +278,7 @@ python-daemon==3.1.0 python-dateutil==2.9.0.post0 # via # botocore + # google-cloud-bigquery # luigi # pandas python-json-logger==2.0.7 @@ -282,6 +295,7 @@ requests==2.32.3 # via # azure-core # google-api-core + # google-cloud-bigquery # hail # msal # msrest diff --git a/v03_pipeline/lib/misc/allele_registry.py b/v03_pipeline/lib/misc/allele_registry.py index 77023a9ba..6129d7e50 100644 --- a/v03_pipeline/lib/misc/allele_registry.py +++ b/v03_pipeline/lib/misc/allele_registry.py @@ -8,9 +8,9 @@ import hailtop.fs as hfs import requests from requests import HTTPError -from requests.adapters import HTTPAdapter, Retry from v03_pipeline.lib.logger import get_logger +from v03_pipeline.lib.misc.requests import requests_retry_session from v03_pipeline.lib.model import Env, ReferenceGenome MAX_VARIANTS_PER_REQUEST = 1000000 @@ -96,13 +96,7 @@ def register_alleles( logger.info('Calling the ClinGen Allele Registry') with hfs.open(formatted_vcf_file_name, 'r') as vcf_in: data = vcf_in.read() - s = requests.Session() - retries = Retry( - total=5, - backoff_factor=1, - status_forcelist=[500, 502, 503, 504], - ) - s.mount('https://', HTTPAdapter(max_retries=retries)) + s = requests_retry_session() res = s.put( url=build_url(base_url, reference_genome), data=data, diff --git a/v03_pipeline/lib/misc/requests.py b/v03_pipeline/lib/misc/requests.py new file mode 100644 index 000000000..e0ee66f36 --- /dev/null +++ b/v03_pipeline/lib/misc/requests.py @@ -0,0 +1,14 @@ +import requests +from requests.adapters import HTTPAdapter, Retry + + +def requests_retry_session(): + s = requests.Session() + retries = Retry( + total=5, + backoff_factor=1, + status_forcelist=[500, 502, 503, 504], + ) + s.mount('http://', HTTPAdapter(max_retries=retries)) + s.mount('https://', HTTPAdapter(max_retries=retries)) + return s diff --git a/v03_pipeline/lib/misc/terra_data_repository.py b/v03_pipeline/lib/misc/terra_data_repository.py new file mode 100644 index 000000000..d55b3ff37 --- /dev/null +++ b/v03_pipeline/lib/misc/terra_data_repository.py @@ -0,0 +1,68 @@ +import os +import re +from collections.abc import Generator +from concurrent.futures import ThreadPoolExecutor, as_completed + +import google.cloud.bigquery +from google.cloud import bigquery + +from v03_pipeline.lib.misc.gcp import get_service_account_credentials +from v03_pipeline.lib.misc.requests import requests_retry_session + +BIGQUERY_METRICS = [ + 'collaborator_sample_id', + 'predicted_sex', +] +BIGQUERY_RESOURCE = 'bigquery' +TABLE_NAME_VALIDATION_REGEX = r'datarepo-\w+.datarepo_\w+' +TDR_ROOT_URL = 'https://data.terra.bio/api/repository/v1/' + + +def _tdr_request(resource: str) -> dict: + service_account_token = get_service_account_credentials().token + s = requests_retry_session() + res = s.get( + url=os.path.join(TDR_ROOT_URL, resource), + headers={'Authorization': f'Bearer {service_account_token}'}, + timeout=10, + ) + res.raise_for_status() + return res.json() + + +def _get_dataset_ids() -> list[str]: + res_body = _tdr_request('datasets') + items = res_body['items'] + for item in items: + if not any(x['cloudResource'] == BIGQUERY_RESOURCE for x in item['storage']): + # Hard failure on purpose to prompt manual investigation. + msg = 'Datasets without bigquery sources are unsupported' + raise ValueError(msg) + return [x['id'] for x in items] + + +def gen_bq_table_names() -> Generator[str]: + with ThreadPoolExecutor(max_workers=5) as executor: + futures = [ + executor.submit( + _tdr_request, + f'datasets/{dataset_id}?include=ACCESS_INFORMATION', + ) + for dataset_id in _get_dataset_ids() + ] + for future in as_completed(futures): + result = future.result() + yield f"{result['accessInformation']['bigQuery']['projectId']}.{result['accessInformation']['bigQuery']['datasetName']}" + + +def bq_metrics_query(bq_table_name: str) -> google.cloud.bigquery.table.RowIterator: + if not re.match(TABLE_NAME_VALIDATION_REGEX, bq_table_name): + msg = f'{bq_table_name} does not match expected pattern' + raise ValueError(msg) + client = bigquery.Client() + return client.query_and_wait( + f""" + SELECT {','.join(BIGQUERY_METRICS)} + FROM `{bq_table_name}.sample` + """, # noqa: S608 + ) diff --git a/v03_pipeline/lib/misc/terra_data_repository_test.py b/v03_pipeline/lib/misc/terra_data_repository_test.py new file mode 100644 index 000000000..54646e193 --- /dev/null +++ b/v03_pipeline/lib/misc/terra_data_repository_test.py @@ -0,0 +1,303 @@ +import json +import os +import unittest +from types import SimpleNamespace +from unittest.mock import Mock, patch + +import responses + +from v03_pipeline.lib.misc.terra_data_repository import ( + TDR_ROOT_URL, + _get_dataset_ids, + gen_bq_table_names, +) + +TDR_DATASETS = [ + { + 'id': '2dc51ee0-a037-499d-a915-a7c20a0b216d', + 'name': 'RP_3053', + 'description': 'TGG_Ware_DRAGEN_hg38. Dataset automatically created and linked to: RP-3053', + 'defaultProfileId': '0a164b9a-2b8b-45d2-859e-a4e369b9cb4f', + 'createdDate': '2023-10-05T13:15:27.649760Z', + 'storage': [ + { + 'region': 'us-central1', + 'cloudResource': 'bigquery', + 'cloudPlatform': 'gcp', + }, + { + 'region': 'us-east4', + 'cloudResource': 'firestore', + 'cloudPlatform': 'gcp', + }, + { + 'region': 'us-central1', + 'cloudResource': 'bucket', + 'cloudPlatform': 'gcp', + }, + ], + 'secureMonitoringEnabled': False, + 'cloudPlatform': 'gcp', + 'dataProject': 'datarepo-7242affb', + 'storageAccount': None, + 'phsId': None, + 'selfHosted': False, + 'predictableFileIds': False, + 'tags': [], + 'resourceLocks': { + 'exclusive': None, + 'shared': [], + }, + }, + { + 'id': 'beef77e8-575b-40e5-9340-a6f10e0bec67', + 'name': 'RP_3056', + 'description': 'RGP_HMB_DRAGEN_hg38. Dataset automatically created and linked to: RP-3056', + 'defaultProfileId': '835dd05d-c603-4b0d-926f-d9143fd24549', + 'createdDate': '2023-10-05T20:15:27.622481Z', + 'storage': [ + { + 'region': 'us-central1', + 'cloudResource': 'bigquery', + 'cloudPlatform': 'gcp', + }, + { + 'region': 'us-east4', + 'cloudResource': 'firestore', + 'cloudPlatform': 'gcp', + }, + { + 'region': 'us-central1', + 'cloudResource': 'bucket', + 'cloudPlatform': 'gcp', + }, + ], + 'secureMonitoringEnabled': False, + 'cloudPlatform': 'gcp', + 'dataProject': 'datarepo-5a72e31b', + 'storageAccount': None, + 'phsId': None, + 'selfHosted': False, + 'predictableFileIds': False, + 'tags': [], + 'resourceLocks': { + 'exclusive': None, + 'shared': [], + }, + }, + { + 'id': 'c8d74ac4-9a2e-4d3d-a6b5-1f4b433d949f', + 'name': 'RP_3055', + 'description': 'RGP_GRU_DRAGEN_hg38. Dataset automatically created and linked to: RP-3055', + 'defaultProfileId': '835dd05d-c603-4b0d-926f-d9143fd24549', + 'createdDate': '2023-10-05T20:15:28.255304Z', + 'storage': [ + { + 'region': 'us-central1', + 'cloudResource': 'bigquery', + 'cloudPlatform': 'gcp', + }, + { + 'region': 'us-east4', + 'cloudResource': 'firestore', + 'cloudPlatform': 'gcp', + }, + { + 'region': 'us-central1', + 'cloudResource': 'bucket', + 'cloudPlatform': 'gcp', + }, + ], + 'secureMonitoringEnabled': False, + 'cloudPlatform': 'gcp', + 'dataProject': 'datarepo-0f5be351', + 'storageAccount': None, + 'phsId': None, + 'selfHosted': False, + 'predictableFileIds': False, + 'tags': [], + 'resourceLocks': { + 'exclusive': None, + 'shared': [ + 'Gw_KTeYRS1aLhRvapMLYLg', + 'WrP-0w1aROOUbgkI8JS6Ug', + ], + }, + }, +] + + +@patch( + 'v03_pipeline.lib.misc.terra_data_repository.get_service_account_credentials', + return_value=SimpleNamespace( + token='abcdefg', # noqa: S106 + ), +) +class TerraDataRepositoryTest(unittest.TestCase): + @responses.activate + def test_get_dataset_ids(self, _: Mock) -> None: + responses.get( + os.path.join(TDR_ROOT_URL, 'datasets'), + body=json.dumps( + { + 'total': 3, + 'filteredTotal': 3, + 'items': TDR_DATASETS, + }, + ), + ) + self.assertListEqual( + _get_dataset_ids(), + [ + '2dc51ee0-a037-499d-a915-a7c20a0b216d', + 'beef77e8-575b-40e5-9340-a6f10e0bec67', + 'c8d74ac4-9a2e-4d3d-a6b5-1f4b433d949f', + ], + ) + + @responses.activate + def test_get_dataset_ids_no_bq(self, _: Mock) -> None: + responses.get( + os.path.join(TDR_ROOT_URL, 'datasets'), + body=json.dumps( + { + 'total': 1, + 'filteredTotal': 1, + 'items': [ + { + 'id': '2dc51ee0-a037-499d-a915-a7c20a0b216d', + 'name': 'RP_3053', + 'description': 'TGG_Ware_DRAGEN_hg38. Dataset automatically created and linked to: RP-3053', + 'defaultProfileId': '0a164b9a-2b8b-45d2-859e-a4e369b9cb4f', + 'createdDate': '2023-10-05T13:15:27.649760Z', + 'storage': [ + # NB: bigquery was removed from 'storage' here. + { + 'region': 'us-east4', + 'cloudResource': 'firestore', + 'cloudPlatform': 'gcp', + }, + { + 'region': 'us-central1', + 'cloudResource': 'bucket', + 'cloudPlatform': 'gcp', + }, + ], + 'secureMonitoringEnabled': False, + 'cloudPlatform': 'gcp', + 'dataProject': 'datarepo-7242affb', + 'storageAccount': None, + 'phsId': None, + 'selfHosted': False, + 'predictableFileIds': False, + 'tags': [], + 'resourceLocks': { + 'exclusive': None, + 'shared': [], + }, + }, + ], + }, + ), + ) + self.assertRaises( + ValueError, + _get_dataset_ids, + ) + + @responses.activate + def test_gen_bq_table_names(self, _: Mock) -> None: + responses.get( + os.path.join(TDR_ROOT_URL, 'datasets'), + body=json.dumps( + { + 'total': 3, + 'filteredTotal': 3, + 'items': TDR_DATASETS, + }, + ), + ) + for dataset_id, name, project_name, dataset_name in [ + ( + '2dc51ee0-a037-499d-a915-a7c20a0b216d', + 'RP_3053', + 'datarepo-7242affb', + 'datarepo_RP_3053', + ), + ( + 'beef77e8-575b-40e5-9340-a6f10e0bec67', + 'RP_3056', + 'datarepo-5a72e31b', + 'datarepo_RP_3056', + ), + ( + 'c8d74ac4-9a2e-4d3d-a6b5-1f4b433d949f', + 'RP_3059', + 'datarepo-aada2e3b', + 'datarepo_RP_3059', + ), + ]: + responses.get( + os.path.join( + TDR_ROOT_URL, + f'datasets/{dataset_id}?include=ACCESS_INFORMATION', + ), + body=json.dumps( + { + 'id': dataset_id, + 'name': name, + 'description': 'TGG_Ware_DRAGEN_hg38. Dataset automatically created and linked to: RP-3053', + 'defaultProfileId': None, + 'dataProject': None, + 'defaultSnapshotId': None, + 'schema': None, + 'createdDate': '2023-10-05T13:15:27.649760Z', + 'storage': None, + 'secureMonitoringEnabled': False, + 'phsId': None, + 'accessInformation': { + 'bigQuery': { + 'datasetName': dataset_name, + 'datasetId': f'{project_name}:{dataset_name}', + 'projectId': project_name, + 'link': 'https://console.cloud.google.com/bigquery?project=datarepo-7242affb&ws=!datarepo_RP_3053&d=datarepo_RP_3053&p=datarepo-7242affb&page=dataset', + 'tables': [ + { + 'name': 'jointcallset', + 'id': f'{project_name}.{dataset_name}.jointcallset', + 'qualifiedName': 'datarepo-7242affb.datarepo_RP_3053.jointcallset', + 'link': 'https://console.cloud.google.com/bigquery?project=datarepo-7242affb&ws=!datarepo_RP_3053&d=datarepo_RP_3053&p=datarepo-7242affb&page=table&t=jointcallset', + 'sampleQuery': 'SELECT * FROM `datarepo-7242affb.datarepo_RP_3053.jointcallset`', + }, + { + 'name': 'sample', + 'id': f'{project_name}.{dataset_name}.sample', + 'qualifiedName': 'datarepo-7242affb.datarepo_RP_3053.sample', + 'link': 'https://console.cloud.google.com/bigquery?project=datarepo-7242affb&ws=!datarepo_RP_3053&d=datarepo_RP_3053&p=datarepo-7242affb&page=table&t=sample', + 'sampleQuery': 'SELECT * FROM `datarepo-7242affb.datarepo_RP_3053.sample`', + }, + ], + }, + 'parquet': None, + }, + 'cloudPlatform': None, + 'selfHosted': False, + 'properties': None, + 'ingestServiceAccount': None, + 'predictableFileIds': False, + 'tags': [], + 'resourceLocks': { + 'exclusive': None, + 'shared': [], + }, + }, + ), + ) + self.assertCountEqual( + list(gen_bq_table_names()), + [ + 'datarepo-7242affb.datarepo_RP_3053', + 'datarepo-5a72e31b.datarepo_RP_3056', + 'datarepo-aada2e3b.datarepo_RP_3059', + ], + ) diff --git a/v03_pipeline/lib/model/dataset_type.py b/v03_pipeline/lib/model/dataset_type.py index 281cffd9b..e7e8a28d3 100644 --- a/v03_pipeline/lib/model/dataset_type.py +++ b/v03_pipeline/lib/model/dataset_type.py @@ -186,6 +186,14 @@ def expect_filters( ) -> bool: return self == DatasetType.SNV_INDEL and sample_type == SampleType.WES + def expect_tdr_metrics( + self, + reference_genome: ReferenceGenome, + ) -> bool: + return ( + self == DatasetType.SNV_INDEL and reference_genome == ReferenceGenome.GRCh38 + ) + @property def has_gencode_gene_symbol_to_gene_id_mapping(self) -> bool: return self == DatasetType.SV diff --git a/v03_pipeline/lib/model/environment.py b/v03_pipeline/lib/model/environment.py index d95d7a27c..e307b0226 100644 --- a/v03_pipeline/lib/model/environment.py +++ b/v03_pipeline/lib/model/environment.py @@ -56,6 +56,7 @@ os.environ.get('ACCESS_PRIVATE_REFERENCE_DATASETS') == '1' ) CHECK_SEX_AND_RELATEDNESS = os.environ.get('CHECK_SEX_AND_RELATEDNESS') == '1' +EXPECT_TDR_METRICS = os.environ.get('EXPECT_TDR_METRICS') == '1' EXPECT_WES_FILTERS = os.environ.get('EXPECT_WES_FILTERS') == '1' INCLUDE_PIPELINE_VERSION_IN_PREFIX = ( os.environ.get('INCLUDE_PIPELINE_VERSION_IN_PREFIX') == '1' @@ -72,6 +73,7 @@ class Env: CLINGEN_ALLELE_REGISTRY_LOGIN: str | None = CLINGEN_ALLELE_REGISTRY_LOGIN CLINGEN_ALLELE_REGISTRY_PASSWORD: str | None = CLINGEN_ALLELE_REGISTRY_PASSWORD DEPLOYMENT_TYPE: Literal['dev', 'prod'] = DEPLOYMENT_TYPE + EXPECT_TDR_METRICS: bool = EXPECT_TDR_METRICS EXPECT_WES_FILTERS: bool = EXPECT_WES_FILTERS GCLOUD_DATAPROC_SECONDARY_WORKERS: str = GCLOUD_DATAPROC_SECONDARY_WORKERS GCLOUD_PROJECT: str | None = GCLOUD_PROJECT diff --git a/v03_pipeline/lib/paths.py b/v03_pipeline/lib/paths.py index 68d27bba2..5a54880fd 100644 --- a/v03_pipeline/lib/paths.py +++ b/v03_pipeline/lib/paths.py @@ -98,10 +98,9 @@ def family_table_path( ) -def imputed_sex_path( +def tdr_metrics_dir( reference_genome: ReferenceGenome, dataset_type: DatasetType, - callset_path: str, ) -> str: return os.path.join( _pipeline_prefix( @@ -109,8 +108,18 @@ def imputed_sex_path( reference_genome, dataset_type, ), - 'imputed_sex', - f'{hashlib.sha256(callset_path.encode("utf8")).hexdigest()}.tsv', + 'tdr_metrics', + ) + + +def tdr_metrics_path( + reference_genome: ReferenceGenome, + dataset_type: DatasetType, + bq_table_name: str, +) -> str: + return os.path.join( + tdr_metrics_dir(reference_genome, dataset_type), + f'{bq_table_name}.tsv', ) diff --git a/v03_pipeline/lib/paths_test.py b/v03_pipeline/lib/paths_test.py index f49e62b61..8c5f097f4 100644 --- a/v03_pipeline/lib/paths_test.py +++ b/v03_pipeline/lib/paths_test.py @@ -9,7 +9,6 @@ from v03_pipeline.lib.paths import ( family_table_path, imported_callset_path, - imputed_sex_path, lookup_table_path, metadata_for_run_path, new_variants_table_path, @@ -19,6 +18,7 @@ relatedness_check_table_path, remapped_and_subsetted_callset_path, sex_check_table_path, + tdr_metrics_path, valid_filters_path, validation_errors_for_run_path, variant_annotations_table_path, @@ -177,14 +177,14 @@ def test_imported_callset_path(self) -> None: '/var/seqr/seqr-loading-temp/v3.1/GRCh38/SNV_INDEL/imported_callsets/ead56bb177a5de24178e1e622ce1d8beb3f8892bdae1c925d22ca0af4013d6dd.mt', ) - def test_imputed_sex_path(self) -> None: + def test_tdr_metrics_path(self) -> None: self.assertEqual( - imputed_sex_path( + tdr_metrics_path( ReferenceGenome.GRCh38, DatasetType.SNV_INDEL, - 'gs://abc.efg/callset.vcf.gz', + 'datarepo-7242affb.datarepo_RP_3053', ), - '/var/seqr/seqr-loading-temp/v3.1/GRCh38/SNV_INDEL/imputed_sex/ead56bb177a5de24178e1e622ce1d8beb3f8892bdae1c925d22ca0af4013d6dd.tsv', + '/var/seqr/seqr-loading-temp/v3.1/GRCh38/SNV_INDEL/tdr_metrics/datarepo-7242affb.datarepo_RP_3053.tsv', ) def test_new_variants_table_path(self) -> None: diff --git a/v03_pipeline/lib/tasks/base/base_loading_run_params.py b/v03_pipeline/lib/tasks/base/base_loading_run_params.py index 7c79b00d6..48a70ec6f 100644 --- a/v03_pipeline/lib/tasks/base/base_loading_run_params.py +++ b/v03_pipeline/lib/tasks/base/base_loading_run_params.py @@ -34,6 +34,10 @@ class BaseLoadingRunParams(luigi.Task): default=False, parsing=luigi.BoolParameter.EXPLICIT_PARSING, ) + skip_expect_tdr_metrics = luigi.BoolParameter( + default=False, + parsing=luigi.BoolParameter.EXPLICIT_PARSING, + ) skip_validation = luigi.BoolParameter( default=False, parsing=luigi.BoolParameter.EXPLICIT_PARSING, diff --git a/v03_pipeline/lib/tasks/write_imported_callset.py b/v03_pipeline/lib/tasks/write_imported_callset.py index 90bba9ebf..9caa71e58 100644 --- a/v03_pipeline/lib/tasks/write_imported_callset.py +++ b/v03_pipeline/lib/tasks/write_imported_callset.py @@ -22,6 +22,7 @@ from v03_pipeline.lib.tasks.base.base_loading_run_params import BaseLoadingRunParams from v03_pipeline.lib.tasks.base.base_write import BaseWriteTask from v03_pipeline.lib.tasks.files import CallsetTask, GCSorLocalTarget +from v03_pipeline.lib.tasks.write_tdr_metrics_files import WriteTDRMetricsFilesTask from v03_pipeline.lib.tasks.write_validation_errors_for_run import ( WriteValidationErrorsForRunTask, ) @@ -60,6 +61,17 @@ def requires(self) -> list[luigi.Task]: ), ), ] + if ( + Env.EXPECT_TDR_METRICS + and not self.skip_expect_tdr_metrics + and self.dataset_type.expect_tdr_metrics( + self.reference_genome, + ) + ): + requirements = [ + *requirements, + self.clone(WriteTDRMetricsFilesTask), + ] return [ *requirements, CallsetTask(self.callset_path), diff --git a/v03_pipeline/lib/tasks/write_sex_check_table.py b/v03_pipeline/lib/tasks/write_sex_check_table.py index b8b4bb9e7..f87233507 100644 --- a/v03_pipeline/lib/tasks/write_sex_check_table.py +++ b/v03_pipeline/lib/tasks/write_sex_check_table.py @@ -1,10 +1,12 @@ import hail as hl +import hailtop.fs as hfs import luigi from v03_pipeline.lib.misc.io import import_imputed_sex -from v03_pipeline.lib.paths import imputed_sex_path, sex_check_table_path +from v03_pipeline.lib.paths import sex_check_table_path, tdr_metrics_dir from v03_pipeline.lib.tasks.base.base_write import BaseWriteTask -from v03_pipeline.lib.tasks.files import GCSorLocalTarget, RawFileTask +from v03_pipeline.lib.tasks.files import GCSorLocalTarget +from v03_pipeline.lib.tasks.write_tdr_metrics_files import WriteTDRMetricsFilesTask class WriteSexCheckTableTask(BaseWriteTask): @@ -20,13 +22,15 @@ def output(self) -> luigi.Target: ) def requires(self) -> luigi.Task: - return RawFileTask( - imputed_sex_path( - self.reference_genome, - self.dataset_type, - self.callset_path, - ), - ) + return self.clone(WriteTDRMetricsFilesTask) def create_table(self) -> hl.Table: - return import_imputed_sex(self.input().path) + ht = None + for tdr_metrics_file in hfs.ls( + tdr_metrics_dir(self.reference_genome, self.dataset_type), + ): + if not ht: + ht = import_imputed_sex(tdr_metrics_file.path) + continue + ht = ht.union(import_imputed_sex(tdr_metrics_file.path)) + return ht diff --git a/v03_pipeline/lib/tasks/write_sex_check_table_test.py b/v03_pipeline/lib/tasks/write_sex_check_table_test.py new file mode 100644 index 000000000..2c935f44c --- /dev/null +++ b/v03_pipeline/lib/tasks/write_sex_check_table_test.py @@ -0,0 +1,96 @@ +from unittest.mock import Mock, patch + +import google.cloud.bigquery +import hail as hl +import luigi.worker + +from v03_pipeline.lib.model import DatasetType, ReferenceGenome +from v03_pipeline.lib.paths import sex_check_table_path, tdr_metrics_path +from v03_pipeline.lib.tasks.write_sex_check_table import ( + WriteSexCheckTableTask, +) +from v03_pipeline.lib.test.mocked_dataroot_testcase import MockedDatarootTestCase + + +class WriteSexCheckTableTaskTest(MockedDatarootTestCase): + @patch('v03_pipeline.lib.tasks.write_tdr_metrics_files.gen_bq_table_names') + @patch('v03_pipeline.lib.tasks.write_tdr_metrics_file.bq_metrics_query') + def test_snv_sex_check_table_task( + self, + mock_bq_metrics_query: Mock, + mock_gen_bq_table_names: Mock, + ) -> None: + mock_gen_bq_table_names.return_value = [ + 'datarepo-7242affb.datarepo_RP_3053', + 'datarepo-5a72e31b.datarepo_RP_3056', + ] + mock_bq_metrics_query.side_effect = [ + iter( + [ + google.cloud.bigquery.table.Row( + ('SM-NJ8MF', 'Unknown'), + {'collaborator_sample_id': 0, 'predicted_sex': 1}, + ), + google.cloud.bigquery.table.Row( + ('SM-MWOGC', 'Female'), + {'collaborator_sample_id': 0, 'predicted_sex': 1}, + ), + google.cloud.bigquery.table.Row( + ('SM-MWKWL', 'Male'), + {'collaborator_sample_id': 0, 'predicted_sex': 1}, + ), + ], + ), + iter( + [ + google.cloud.bigquery.table.Row( + ('SM-NGE65', 'Male'), + {'collaborator_sample_id': 0, 'predicted_sex': 1}, + ), + google.cloud.bigquery.table.Row( + ('SM-NGE5G', 'Male'), + {'collaborator_sample_id': 0, 'predicted_sex': 1}, + ), + google.cloud.bigquery.table.Row( + ('SM-NC6LM', 'Male'), + {'collaborator_sample_id': 0, 'predicted_sex': 1}, + ), + ], + ), + ] + worker = luigi.worker.Worker() + write_sex_check_table = WriteSexCheckTableTask( + reference_genome=ReferenceGenome.GRCh38, + dataset_type=DatasetType.SNV_INDEL, + callset_path='na', + ) + worker.add(write_sex_check_table) + worker.run() + self.assertTrue(write_sex_check_table.complete()) + sex_check_ht = hl.read_table( + sex_check_table_path( + ReferenceGenome.GRCh38, + DatasetType.SNV_INDEL, + 'na', + ), + ) + self.assertEqual( + sex_check_ht.collect(), + [ + hl.Struct(s='SM-MWKWL', predicted_sex='M'), + hl.Struct(s='SM-MWOGC', predicted_sex='F'), + hl.Struct(s='SM-NC6LM', predicted_sex='M'), + hl.Struct(s='SM-NGE5G', predicted_sex='M'), + hl.Struct(s='SM-NGE65', predicted_sex='M'), + hl.Struct(s='SM-NJ8MF', predicted_sex='U'), + ], + ) + # Check underlying tdr metrics file. + with open( + tdr_metrics_path( + ReferenceGenome.GRCh38, + DatasetType.SNV_INDEL, + 'datarepo-5a72e31b.datarepo_RP_3056', + ), + ) as f: + self.assertTrue('collaborator_sample_id' in f.read()) diff --git a/v03_pipeline/lib/tasks/write_tdr_metrics_file.py b/v03_pipeline/lib/tasks/write_tdr_metrics_file.py new file mode 100644 index 000000000..b1c7ad9c0 --- /dev/null +++ b/v03_pipeline/lib/tasks/write_tdr_metrics_file.py @@ -0,0 +1,35 @@ +import csv + +import luigi +import luigi.util + +from v03_pipeline.lib.misc.terra_data_repository import ( + BIGQUERY_METRICS, + bq_metrics_query, +) +from v03_pipeline.lib.paths import tdr_metrics_path +from v03_pipeline.lib.tasks.base.base_loading_pipeline_params import ( + BaseLoadingPipelineParams, +) +from v03_pipeline.lib.tasks.files import GCSorLocalTarget + + +@luigi.util.inherits(BaseLoadingPipelineParams) +class WriteTDRMetricsFileTask(luigi.Task): + bq_table_name = luigi.Parameter() + + def output(self) -> luigi.Target: + return GCSorLocalTarget( + tdr_metrics_path( + self.reference_genome, + self.dataset_type, + self.bq_table_name, + ), + ) + + def run(self): + with self.output().open('w') as f: + writer = csv.DictWriter(f, fieldnames=BIGQUERY_METRICS, delimiter='\t') + writer.writeheader() + for row in bq_metrics_query(self.bq_table_name): + writer.writerow(row) diff --git a/v03_pipeline/lib/tasks/write_tdr_metrics_files.py b/v03_pipeline/lib/tasks/write_tdr_metrics_files.py new file mode 100644 index 000000000..c2f0d773e --- /dev/null +++ b/v03_pipeline/lib/tasks/write_tdr_metrics_files.py @@ -0,0 +1,28 @@ +import luigi +import luigi.util + +from v03_pipeline.lib.misc.terra_data_repository import gen_bq_table_names +from v03_pipeline.lib.tasks.base.base_loading_pipeline_params import ( + BaseLoadingPipelineParams, +) +from v03_pipeline.lib.tasks.write_tdr_metrics_file import WriteTDRMetricsFileTask + + +@luigi.util.inherits(BaseLoadingPipelineParams) +class WriteTDRMetricsFilesTask(luigi.Task): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.dynamic_write_tdr_metrics_file_task = set() + + def complete(self) -> bool: + return len(self.dynamic_write_tdr_metrics_file_task) >= 1 and all( + write_tdr_metrics_file_task.complete() + for write_tdr_metrics_file_task in self.dynamic_write_tdr_metrics_file_task + ) + + def run(self): + for bq_table_name in gen_bq_table_names(): + self.dynamic_write_tdr_metrics_file_task.add( + self.clone(WriteTDRMetricsFileTask, bq_table_name=bq_table_name), + ) + yield self.dynamic_write_tdr_metrics_file_task