From 73d166990a695f8f2651a69b18c5c6dbb1345112 Mon Sep 17 00:00:00 2001 From: Julia Klugherz Date: Wed, 9 Oct 2024 08:24:39 -0400 Subject: [PATCH 1/2] add task to write success file for run (#920) * add task to write success file for run * remove WriteProjectFamilyTablesTask from pipeline_worker * no hfs * add to init --- v03_pipeline/bin/pipeline_worker.py | 36 ++++++------------- v03_pipeline/lib/paths.py | 15 ++++++++ v03_pipeline/lib/tasks/__init__.py | 2 ++ v03_pipeline/lib/tasks/write_success_file.py | 36 +++++++++++++++++++ .../lib/tasks/write_success_file_test.py | 33 +++++++++++++++++ 5 files changed, 97 insertions(+), 25 deletions(-) create mode 100644 v03_pipeline/lib/tasks/write_success_file.py create mode 100644 v03_pipeline/lib/tasks/write_success_file_test.py diff --git a/v03_pipeline/bin/pipeline_worker.py b/v03_pipeline/bin/pipeline_worker.py index b2ecb22c4..e0e76985f 100755 --- a/v03_pipeline/bin/pipeline_worker.py +++ b/v03_pipeline/bin/pipeline_worker.py @@ -15,9 +15,9 @@ ) from v03_pipeline.lib.tasks import ( UpdateVariantAnnotationsTableWithNewSamplesTask, - WriteProjectFamilyTablesTask, ) from v03_pipeline.lib.tasks.trigger_hail_backend_reload import TriggerHailBackendReload +from v03_pipeline.lib.tasks.write_success_file import WriteSuccessFileTask logger = get_logger(__name__) @@ -47,39 +47,25 @@ def main(): ) for project_guid in lpr.projects_to_run ] - task_kwargs = { - k: v for k, v in lpr.model_dump().items() if k != 'projects_to_run' - } run_id = datetime.datetime.now(datetime.timezone.utc).strftime( '%Y%m%d-%H%M%S', ) + loading_run_task_params = { + 'project_guids': lpr.projects_to_run, + 'project_remap_paths': project_remap_paths, + 'project_pedigree_paths': project_pedigree_paths, + 'run_id': run_id, + **{k: v for k, v in lpr.model_dump().items() if k != 'projects_to_run'}, + } tasks = [ UpdateVariantAnnotationsTableWithNewSamplesTask( - project_guids=lpr.projects_to_run, - project_remap_paths=project_remap_paths, - project_pedigree_paths=project_pedigree_paths, - run_id=run_id, - **task_kwargs, + **loading_run_task_params, ), - *[ - WriteProjectFamilyTablesTask( - project_guid=lpr.projects_to_run[i], - project_remap_path=project_remap_paths[i], - project_pedigree_path=project_pedigree_paths[i], - **task_kwargs, - ) - for i in range(len(lpr.projects_to_run)) - ], + WriteSuccessFileTask(**loading_run_task_params), ] if Env.SHOULD_TRIGGER_HAIL_BACKEND_RELOAD: tasks.append( - TriggerHailBackendReload( - project_guids=lpr.projects_to_run, - project_remap_paths=project_remap_paths, - project_pedigree_paths=project_pedigree_paths, - run_id=run_id, - **task_kwargs, - ), + TriggerHailBackendReload(**loading_run_task_params), ) luigi.build(tasks) except Exception: diff --git a/v03_pipeline/lib/paths.py b/v03_pipeline/lib/paths.py index 2c6b954bb..3969eeecf 100644 --- a/v03_pipeline/lib/paths.py +++ b/v03_pipeline/lib/paths.py @@ -366,3 +366,18 @@ def loading_pipeline_queue_path() -> str: 'loading_pipeline_queue', 'request.json', ) + + +def pipeline_run_success_file_path( + reference_genome: ReferenceGenome, + dataset_type: DatasetType, + run_id: str, +) -> str: + return os.path.join( + runs_path( + reference_genome, + dataset_type, + ), + run_id, + '_SUCCESS', + ) diff --git a/v03_pipeline/lib/tasks/__init__.py b/v03_pipeline/lib/tasks/__init__.py index 7d034326e..d4f1d6709 100644 --- a/v03_pipeline/lib/tasks/__init__.py +++ b/v03_pipeline/lib/tasks/__init__.py @@ -34,6 +34,7 @@ from v03_pipeline.lib.tasks.write_project_family_tables import ( WriteProjectFamilyTablesTask, ) +from v03_pipeline.lib.tasks.write_success_file import WriteSuccessFileTask __all__ = [ 'DeleteFamilyTableTask', @@ -52,4 +53,5 @@ 'UpdateVariantAnnotationsTableWithDeletedFamiliesTask', 'WriteMetadataForRunTask', 'WriteProjectFamilyTablesTask', + 'WriteSuccessFileTask', ] diff --git a/v03_pipeline/lib/tasks/write_success_file.py b/v03_pipeline/lib/tasks/write_success_file.py new file mode 100644 index 000000000..5e9880d8c --- /dev/null +++ b/v03_pipeline/lib/tasks/write_success_file.py @@ -0,0 +1,36 @@ +import luigi +import luigi.util + +from v03_pipeline.lib.paths import pipeline_run_success_file_path +from v03_pipeline.lib.tasks import WriteProjectFamilyTablesTask +from v03_pipeline.lib.tasks.base.base_project_info_params import ( + BaseLoadingRunWithProjectInfoParams, +) +from v03_pipeline.lib.tasks.files import GCSorLocalTarget + + +@luigi.util.inherits(BaseLoadingRunWithProjectInfoParams) +class WriteSuccessFileTask(luigi.Task): + def output(self) -> luigi.Target: + return GCSorLocalTarget( + pipeline_run_success_file_path( + self.reference_genome, + self.dataset_type, + self.run_id, + ), + ) + + def requires(self): + return [ + self.clone( + WriteProjectFamilyTablesTask, + project_guid=self.project_guids[i], + project_remap_path=self.project_remap_paths[i], + project_pedigree_path=self.project_pedigree_paths[i], + ) + for i in range(len(self.project_guids)) + ] + + def run(self): + with self.output().open('w') as f: + f.write('') diff --git a/v03_pipeline/lib/tasks/write_success_file_test.py b/v03_pipeline/lib/tasks/write_success_file_test.py new file mode 100644 index 000000000..a3ffd98b6 --- /dev/null +++ b/v03_pipeline/lib/tasks/write_success_file_test.py @@ -0,0 +1,33 @@ +from unittest import mock + +import luigi.worker + +from v03_pipeline.lib.model import DatasetType, ReferenceGenome, SampleType +from v03_pipeline.lib.tasks.write_success_file import WriteSuccessFileTask +from v03_pipeline.lib.test.mock_complete_task import MockCompleteTask +from v03_pipeline.lib.test.mocked_dataroot_testcase import MockedDatarootTestCase + + +class WriteSuccessFileTaskTest(MockedDatarootTestCase): + @mock.patch( + 'v03_pipeline.lib.tasks.write_success_file.WriteProjectFamilyTablesTask', + ) + def test_write_success_file_task(self, mock_write_project_fam_tables) -> None: + mock_write_project_fam_tables.return_value = MockCompleteTask() + + worker = luigi.worker.Worker() + write_success_file = WriteSuccessFileTask( + reference_genome=ReferenceGenome.GRCh38, + dataset_type=DatasetType.SNV_INDEL, + sample_type=SampleType.WGS, + callset_path='test_callset', + project_guids=['R0113_test_project'], + project_remap_paths=['test_remap'], + project_pedigree_paths=['test_pedigree'], + run_id='manual__2024-04-03', + ) + worker.add(write_success_file) + worker.run() + self.assertTrue(write_success_file.complete()) + with open(write_success_file.output().path) as f: + self.assertEqual(f.read(), '') From 2100276631be4484930aad7fbb6062b73b19a108 Mon Sep 17 00:00:00 2001 From: Benjamin Blankenmeister Date: Wed, 9 Oct 2024 12:01:16 -0400 Subject: [PATCH 2/2] var seqr (#922) * var seqr * ruff * comma * remove luigi config in order to move it to helm --- .../bin/download_vep_reference_data.bash | 2 +- v03_pipeline/bin/rsync_reference_data.bash | 6 ++-- v03_pipeline/bin/vep | 2 +- v03_pipeline/deploy/Dockerfile | 1 - v03_pipeline/lib/model/environment.py | 13 ++++--- v03_pipeline/lib/paths_test.py | 34 +++++++++---------- .../var/luigi_config/luigi-defaults.conf | 9 ----- 7 files changed, 30 insertions(+), 37 deletions(-) delete mode 100644 v03_pipeline/var/luigi_config/luigi-defaults.conf diff --git a/v03_pipeline/bin/download_vep_reference_data.bash b/v03_pipeline/bin/download_vep_reference_data.bash index c5583aa68..5ce6624ab 100755 --- a/v03_pipeline/bin/download_vep_reference_data.bash +++ b/v03_pipeline/bin/download_vep_reference_data.bash @@ -3,7 +3,7 @@ set -eux REFERENCE_GENOME=$1 -VEP_REFERENCE_DATASETS_DIR=${VEP_REFERENCE_DATASETS_DIR:-/seqr/vep-reference-data} +VEP_REFERENCE_DATASETS_DIR=${VEP_REFERENCE_DATASETS_DIR:-/var/seqr/vep-reference-data} case $REFERENCE_GENOME in GRCh38) diff --git a/v03_pipeline/bin/rsync_reference_data.bash b/v03_pipeline/bin/rsync_reference_data.bash index cc18cceb5..98a209f69 100755 --- a/v03_pipeline/bin/rsync_reference_data.bash +++ b/v03_pipeline/bin/rsync_reference_data.bash @@ -3,7 +3,7 @@ set -eux REFERENCE_GENOME=$1 -SEQR_REFERENCE_DATA=${SEQR_REFERENCE_DATA:-/seqr/seqr-reference-data} +REFERENCE_DATASETS_DIR=${REFERENCE_DATASETS_DIR:-/var/seqr/seqr-reference-data} case $REFERENCE_GENOME in @@ -16,5 +16,5 @@ case $REFERENCE_GENOME in exit 1 esac -mkdir -p $SEQR_REFERENCE_DATA/$REFERENCE_GENOME; -gsutil -m rsync -rd "gs://seqr-reference-data/v03/$REFERENCE_GENOME" $SEQR_REFERENCE_DATA/$REFERENCE_GENOME +mkdir -p $REFERENCE_DATASETS_DIR/$REFERENCE_GENOME; +gsutil -m rsync -rd "gs://seqr-reference-data/v03/$REFERENCE_GENOME" $REFERENCE_DATASETS_DIR/$REFERENCE_GENOME diff --git a/v03_pipeline/bin/vep b/v03_pipeline/bin/vep index 33996bf27..4fe2df912 100755 --- a/v03_pipeline/bin/vep +++ b/v03_pipeline/bin/vep @@ -3,7 +3,7 @@ set -eux REFERENCE_GENOME=$1 -VEP_REFERENCE_DATASETS_DIR=${VEP_REFERENCE_DATASETS_DIR:-/seqr/vep-reference-data} +VEP_REFERENCE_DATASETS_DIR=${VEP_REFERENCE_DATASETS_DIR:-/var/seqr/vep-reference-data} VEP_DOCKER_IMAGE="gcr.io/seqr-project/vep-docker-image" case $REFERENCE_GENOME in diff --git a/v03_pipeline/deploy/Dockerfile b/v03_pipeline/deploy/Dockerfile index 418656525..e2fe36e9a 100644 --- a/v03_pipeline/deploy/Dockerfile +++ b/v03_pipeline/deploy/Dockerfile @@ -21,7 +21,6 @@ COPY v03_pipeline/migrations migrations # Special paths COPY v03_pipeline/var/spark_config/spark-defaults.conf /usr/local/lib/python3.10/dist-packages/pyspark/conf/spark-defaults.conf -COPY v03_pipeline/var/luigi_config/luigi-defaults.conf /etc/luigi/luigi.cfg COPY v03_pipeline/bin/vep /vep diff --git a/v03_pipeline/lib/model/environment.py b/v03_pipeline/lib/model/environment.py index 81e227105..9db94937d 100644 --- a/v03_pipeline/lib/model/environment.py +++ b/v03_pipeline/lib/model/environment.py @@ -13,20 +13,23 @@ HAIL_TMP_DIR = os.environ.get('HAIL_TMP_DIR', '/tmp') # noqa: S108 HAIL_SEARCH_DATA_DIR = os.environ.get( 'HAIL_SEARCH_DATA_DIR', - '/seqr/seqr-hail-search-data', + '/var/seqr/seqr-hail-search-data', +) +LOADING_DATASETS_DIR = os.environ.get( + 'LOADING_DATASETS_DIR', + '/var/seqr/seqr-loading-temp', ) -LOADING_DATASETS_DIR = os.environ.get('LOADING_DATASETS_DIR', '/seqr/seqr-loading-temp') PRIVATE_REFERENCE_DATASETS_DIR = os.environ.get( 'PRIVATE_REFERENCE_DATASETS_DIR', - '/seqr/seqr-reference-data-private', + '/var/seqr/seqr-reference-data-private', ) REFERENCE_DATASETS_DIR = os.environ.get( 'REFERENCE_DATASETS_DIR', - '/seqr/seqr-reference-data', + '/var/seqr/seqr-reference-data', ) VEP_REFERENCE_DATASETS_DIR = os.environ.get( 'VEP_REFERENCE_DATASETS_DIR', - '/seqr/vep-reference-data', + '/var/seqr/vep-reference-data', ) HAIL_BACKEND_SERVICE_HOSTNAME = os.environ.get( 'HAIL_BACKEND_SERVICE_HOSTNAME', diff --git a/v03_pipeline/lib/paths_test.py b/v03_pipeline/lib/paths_test.py index 90505fee5..cc18bdbdd 100644 --- a/v03_pipeline/lib/paths_test.py +++ b/v03_pipeline/lib/paths_test.py @@ -37,7 +37,7 @@ def test_cached_reference_dataset_query_path(self) -> None: DatasetType.SNV_INDEL, CachedReferenceDatasetQuery.CLINVAR_PATH_VARIANTS, ), - '/seqr/seqr-reference-data/v03/GRCh38/SNV_INDEL/cached_reference_dataset_queries/clinvar_path_variants.ht', + '/var/seqr/seqr-reference-data/v03/GRCh38/SNV_INDEL/cached_reference_dataset_queries/clinvar_path_variants.ht', ) def test_family_table_path(self) -> None: @@ -48,7 +48,7 @@ def test_family_table_path(self) -> None: SampleType.WES, 'franklin', ), - '/seqr/seqr-hail-search-data/v3.1/GRCh37/SNV_INDEL/families/WES/franklin.ht', + '/var/seqr/seqr-hail-search-data/v3.1/GRCh37/SNV_INDEL/families/WES/franklin.ht', ) with patch('v03_pipeline.lib.paths.Env') as mock_env: mock_env.HAIL_SEARCH_DATA_DIR = 'gs://seqr-datasets/' @@ -90,7 +90,7 @@ def test_project_table_path(self) -> None: SampleType.WES, 'R0652_pipeline_test', ), - '/seqr/seqr-hail-search-data/v3.1/GRCh38/MITO/projects/WES/R0652_pipeline_test.ht', + '/var/seqr/seqr-hail-search-data/v3.1/GRCh38/MITO/projects/WES/R0652_pipeline_test.ht', ) def test_valid_reference_dataset_collection_path(self) -> None: @@ -110,7 +110,7 @@ def test_valid_reference_dataset_collection_path(self) -> None: DatasetType.SNV_INDEL, ReferenceDatasetCollection.HGMD, ), - '/seqr/seqr-reference-data-private/v03/GRCh38/SNV_INDEL/reference_datasets/hgmd.ht', + '/var/seqr/seqr-reference-data-private/v03/GRCh38/SNV_INDEL/reference_datasets/hgmd.ht', ) def test_lookup_table_path(self) -> None: @@ -119,7 +119,7 @@ def test_lookup_table_path(self) -> None: ReferenceGenome.GRCh37, DatasetType.SV, ), - '/seqr/seqr-hail-search-data/v3.1/GRCh37/SV/lookup.ht', + '/var/seqr/seqr-hail-search-data/v3.1/GRCh37/SV/lookup.ht', ) def test_sex_check_table_path(self) -> None: @@ -129,7 +129,7 @@ def test_sex_check_table_path(self) -> None: DatasetType.SNV_INDEL, 'gs://abc.efg/callset.vcf.gz', ), - '/seqr/seqr-loading-temp/v3.1/GRCh38/SNV_INDEL/sex_check/ead56bb177a5de24178e1e622ce1d8beb3f8892bdae1c925d22ca0af4013d6dd.ht', + '/var/seqr/seqr-loading-temp/v3.1/GRCh38/SNV_INDEL/sex_check/ead56bb177a5de24178e1e622ce1d8beb3f8892bdae1c925d22ca0af4013d6dd.ht', ) def test_relatedness_check_table_path(self) -> None: @@ -139,7 +139,7 @@ def test_relatedness_check_table_path(self) -> None: DatasetType.SNV_INDEL, 'gs://abc.efg/callset.vcf.gz', ), - '/seqr/seqr-loading-temp/v3.1/GRCh38/SNV_INDEL/relatedness_check/ead56bb177a5de24178e1e622ce1d8beb3f8892bdae1c925d22ca0af4013d6dd.ht', + '/var/seqr/seqr-loading-temp/v3.1/GRCh38/SNV_INDEL/relatedness_check/ead56bb177a5de24178e1e622ce1d8beb3f8892bdae1c925d22ca0af4013d6dd.ht', ) def test_validation_errors_for_run_path(self) -> None: @@ -149,7 +149,7 @@ def test_validation_errors_for_run_path(self) -> None: DatasetType.SNV_INDEL, 'manual__2023-06-26T18:30:09.349671+00:00', ), - '/seqr/seqr-hail-search-data/v3.1/GRCh38/SNV_INDEL/runs/manual__2023-06-26T18:30:09.349671+00:00/validation_errors.json', + '/var/seqr/seqr-hail-search-data/v3.1/GRCh38/SNV_INDEL/runs/manual__2023-06-26T18:30:09.349671+00:00/validation_errors.json', ) def test_metadata_for_run_path(self) -> None: @@ -159,7 +159,7 @@ def test_metadata_for_run_path(self) -> None: DatasetType.SNV_INDEL, 'manual__2023-06-26T18:30:09.349671+00:00', ), - '/seqr/seqr-hail-search-data/v3.1/GRCh38/SNV_INDEL/runs/manual__2023-06-26T18:30:09.349671+00:00/metadata.json', + '/var/seqr/seqr-hail-search-data/v3.1/GRCh38/SNV_INDEL/runs/manual__2023-06-26T18:30:09.349671+00:00/metadata.json', ) def test_variant_annotations_table_path(self) -> None: @@ -168,7 +168,7 @@ def test_variant_annotations_table_path(self) -> None: ReferenceGenome.GRCh38, DatasetType.GCNV, ), - '/seqr/seqr-hail-search-data/v3.1/GRCh38/GCNV/annotations.ht', + '/var/seqr/seqr-hail-search-data/v3.1/GRCh38/GCNV/annotations.ht', ) def test_remapped_and_subsetted_callset_path(self) -> None: @@ -179,7 +179,7 @@ def test_remapped_and_subsetted_callset_path(self) -> None: 'gs://abc.efg/callset.vcf.gz', 'R0111_tgg_bblanken_wes', ), - '/seqr/seqr-loading-temp/v3.1/GRCh38/GCNV/remapped_and_subsetted_callsets/R0111_tgg_bblanken_wes/ead56bb177a5de24178e1e622ce1d8beb3f8892bdae1c925d22ca0af4013d6dd.mt', + '/var/seqr/seqr-loading-temp/v3.1/GRCh38/GCNV/remapped_and_subsetted_callsets/R0111_tgg_bblanken_wes/ead56bb177a5de24178e1e622ce1d8beb3f8892bdae1c925d22ca0af4013d6dd.mt', ) self.assertEqual( remapped_and_subsetted_callset_path( @@ -188,7 +188,7 @@ def test_remapped_and_subsetted_callset_path(self) -> None: 'gs://abc.efg/callset/*.vcf.gz', 'R0111_tgg_bblanken_wes', ), - '/seqr/seqr-loading-temp/v3.1/GRCh38/GCNV/remapped_and_subsetted_callsets/R0111_tgg_bblanken_wes/bce53ccdb49a5ed2513044e1d0c6224e3ffcc323f770dc807d9175fd3c70a050.mt', + '/var/seqr/seqr-loading-temp/v3.1/GRCh38/GCNV/remapped_and_subsetted_callsets/R0111_tgg_bblanken_wes/bce53ccdb49a5ed2513044e1d0c6224e3ffcc323f770dc807d9175fd3c70a050.mt', ) def test_imported_callset_path(self) -> None: @@ -198,7 +198,7 @@ def test_imported_callset_path(self) -> None: DatasetType.SNV_INDEL, 'gs://abc.efg/callset.vcf.gz', ), - '/seqr/seqr-loading-temp/v3.1/GRCh38/SNV_INDEL/imported_callsets/ead56bb177a5de24178e1e622ce1d8beb3f8892bdae1c925d22ca0af4013d6dd.mt', + '/var/seqr/seqr-loading-temp/v3.1/GRCh38/SNV_INDEL/imported_callsets/ead56bb177a5de24178e1e622ce1d8beb3f8892bdae1c925d22ca0af4013d6dd.mt', ) def test_imputed_sex_path(self) -> None: @@ -208,7 +208,7 @@ def test_imputed_sex_path(self) -> None: DatasetType.SNV_INDEL, 'gs://abc.efg/callset.vcf.gz', ), - '/seqr/seqr-loading-temp/v3.1/GRCh38/SNV_INDEL/imputed_sex/ead56bb177a5de24178e1e622ce1d8beb3f8892bdae1c925d22ca0af4013d6dd.tsv', + '/var/seqr/seqr-loading-temp/v3.1/GRCh38/SNV_INDEL/imputed_sex/ead56bb177a5de24178e1e622ce1d8beb3f8892bdae1c925d22ca0af4013d6dd.tsv', ) def test_new_variants_table_path(self) -> None: @@ -218,7 +218,7 @@ def test_new_variants_table_path(self) -> None: DatasetType.SNV_INDEL, 'manual__2023-06-26T18:30:09.349671+00:00', ), - '/seqr/seqr-hail-search-data/v3.1/GRCh38/SNV_INDEL/runs/manual__2023-06-26T18:30:09.349671+00:00/new_variants.ht', + '/var/seqr/seqr-hail-search-data/v3.1/GRCh38/SNV_INDEL/runs/manual__2023-06-26T18:30:09.349671+00:00/new_variants.ht', ) def test_project_remap_path(self) -> None: @@ -229,7 +229,7 @@ def test_project_remap_path(self) -> None: SampleType.WGS, 'R0652_pipeline_test', ), - '/seqr/seqr-loading-temp/v3.1/GRCh38/SNV_INDEL/remaps/WGS/R0652_pipeline_test.ht', + '/var/seqr/seqr-loading-temp/v3.1/GRCh38/SNV_INDEL/remaps/WGS/R0652_pipeline_test.ht', ) def test_project_pedigree_path(self) -> None: @@ -240,5 +240,5 @@ def test_project_pedigree_path(self) -> None: SampleType.WES, 'R0652_pipeline_test', ), - '/seqr/seqr-loading-temp/v3.1/GRCh38/GCNV/pedigrees/WES/R0652_pipeline_test.ht', + '/var/seqr/seqr-loading-temp/v3.1/GRCh38/GCNV/pedigrees/WES/R0652_pipeline_test.ht', ) diff --git a/v03_pipeline/var/luigi_config/luigi-defaults.conf b/v03_pipeline/var/luigi_config/luigi-defaults.conf deleted file mode 100644 index 0f9e9cb44..000000000 --- a/v03_pipeline/var/luigi_config/luigi-defaults.conf +++ /dev/null @@ -1,9 +0,0 @@ -[core] -default_scheduler_url=http://pipeline-runner-ui:8082/ - -[scheduler] -record_task_history = True -state_path = /seqr/luigi-state/luigi-state.pickle - -[task_history] -db_connection = sqlite:///seqr/luigi-state/luigi-task-hist.db \ No newline at end of file