diff --git a/v03_pipeline/bin/pipeline_worker.py b/v03_pipeline/bin/pipeline_worker.py index 6d624487d..7ed39774d 100755 --- a/v03_pipeline/bin/pipeline_worker.py +++ b/v03_pipeline/bin/pipeline_worker.py @@ -61,7 +61,6 @@ def main(): run_id=datetime.datetime.now(datetime.timezone.utc).strftime( '%Y%m%d-%H%M%S', ), - force=False, **task_kwargs, ), *[ @@ -69,7 +68,6 @@ def main(): project_guid=lpr.projects_to_run[i], project_remap_path=project_remap_paths[i], project_pedigree_path=project_pedigree_paths[i], - force=False, **task_kwargs, ) for i in range(len(lpr.projects_to_run)) diff --git a/v03_pipeline/lib/tasks/base/base_loading_run_params.py b/v03_pipeline/lib/tasks/base/base_loading_run_params.py index a8e8cbde8..4b5ad92dd 100644 --- a/v03_pipeline/lib/tasks/base/base_loading_run_params.py +++ b/v03_pipeline/lib/tasks/base/base_loading_run_params.py @@ -22,10 +22,6 @@ class BaseLoadingRunParams(luigi.Task): default=False, parsing=luigi.BoolParameter.EXPLICIT_PARSING, ) - force = luigi.BoolParameter( - default=False, - parsing=luigi.BoolParameter.EXPLICIT_PARSING, - ) skip_check_sex_and_relatedness = luigi.BoolParameter( default=False, parsing=luigi.BoolParameter.EXPLICIT_PARSING, diff --git a/v03_pipeline/lib/tasks/base/base_update.py b/v03_pipeline/lib/tasks/base/base_update.py index 675eaa1a2..8ecd8f26a 100644 --- a/v03_pipeline/lib/tasks/base/base_update.py +++ b/v03_pipeline/lib/tasks/base/base_update.py @@ -18,9 +18,6 @@ def run(self) -> None: ht = read_fn(self.output().path) ht = self.update_table(ht) write(ht, self.output().path) - # Set force to false after run, allowing "complete()" to succeeded - # when dependencies are re-evaluated. - self.force = False def initialize_table(self) -> hl.Table: raise NotImplementedError diff --git a/v03_pipeline/lib/tasks/base/base_write.py b/v03_pipeline/lib/tasks/base/base_write.py index 7fe6262af..8dade2282 100644 --- a/v03_pipeline/lib/tasks/base/base_write.py +++ b/v03_pipeline/lib/tasks/base/base_write.py @@ -9,9 +9,6 @@ def run(self) -> None: self.init_hail() ht = self.create_table() write(ht, self.output().path) - # Set force to false after run, allowing "complete()" to succeeded - # when dependencies are re-evaluated. - self.force = False def create_table(self) -> hl.Table: raise NotImplementedError diff --git a/v03_pipeline/lib/tasks/update_lookup_table.py b/v03_pipeline/lib/tasks/update_lookup_table.py index 26f525ecc..54fa62103 100644 --- a/v03_pipeline/lib/tasks/update_lookup_table.py +++ b/v03_pipeline/lib/tasks/update_lookup_table.py @@ -27,36 +27,31 @@ class UpdateLookupTableTask(BaseUpdateLookupTableTask): run_id = luigi.Parameter() def complete(self) -> bool: - return ( - not self.force - and super().complete() - and hl.eval( - hl.bind( - lambda updates: hl.all( - [ - updates.contains( - hl.Struct( - callset=self.callset_path, - project_guid=project_guid, - remap_pedigree_hash=remap_pedigree_hash( - self.project_remap_paths[i], - self.project_pedigree_paths[i], - ), + return super().complete() and hl.eval( + hl.bind( + lambda updates: hl.all( + [ + updates.contains( + hl.Struct( + callset=self.callset_path, + project_guid=project_guid, + remap_pedigree_hash=remap_pedigree_hash( + self.project_remap_paths[i], + self.project_pedigree_paths[i], ), - ) - for i, project_guid in enumerate(self.project_guids) - ], - ), - hl.read_table(self.output().path).updates, + ), + ) + for i, project_guid in enumerate(self.project_guids) + ], ), - ) + hl.read_table(self.output().path).updates, + ), ) def requires(self) -> list[luigi.Task]: return [ self.clone( WriteMetadataForRunTask, - force=False, ), ] diff --git a/v03_pipeline/lib/tasks/update_project_table.py b/v03_pipeline/lib/tasks/update_project_table.py index a2ca742ba..cd582009f 100644 --- a/v03_pipeline/lib/tasks/update_project_table.py +++ b/v03_pipeline/lib/tasks/update_project_table.py @@ -24,24 +24,20 @@ class UpdateProjectTableTask(BaseUpdateProjectTableTask): project_pedigree_path = luigi.Parameter() def complete(self) -> bool: - return ( - not self.force - and super().complete() - and hl.eval( - hl.read_table(self.output().path).updates.contains( - hl.Struct( - callset=self.callset_path, - remap_pedigree_hash=remap_pedigree_hash( - self.project_remap_path, - self.project_pedigree_path, - ), + return super().complete() and hl.eval( + hl.read_table(self.output().path).updates.contains( + hl.Struct( + callset=self.callset_path, + remap_pedigree_hash=remap_pedigree_hash( + self.project_remap_path, + self.project_pedigree_path, ), ), - ) + ), ) def requires(self) -> luigi.Task: - return self.clone(WriteRemappedAndSubsettedCallsetTask, force=False) + return self.clone(WriteRemappedAndSubsettedCallsetTask) def update_table(self, ht: hl.Table) -> hl.Table: callset_mt = hl.read_matrix_table(self.input().path) diff --git a/v03_pipeline/lib/tasks/update_variant_annotations_table_with_new_samples.py b/v03_pipeline/lib/tasks/update_variant_annotations_table_with_new_samples.py index ef6442f22..559f65bd8 100644 --- a/v03_pipeline/lib/tasks/update_variant_annotations_table_with_new_samples.py +++ b/v03_pipeline/lib/tasks/update_variant_annotations_table_with_new_samples.py @@ -32,29 +32,25 @@ def requires(self) -> list[luigi.Task]: ] def complete(self) -> bool: - return ( - not self.force - and super().complete() - and hl.eval( - hl.bind( - lambda updates: hl.all( - [ - updates.contains( - hl.Struct( - callset=self.callset_path, - project_guid=project_guid, - remap_pedigree_hash=remap_pedigree_hash( - self.project_remap_paths[i], - self.project_pedigree_paths[i], - ), + return super().complete() and hl.eval( + hl.bind( + lambda updates: hl.all( + [ + updates.contains( + hl.Struct( + callset=self.callset_path, + project_guid=project_guid, + remap_pedigree_hash=remap_pedigree_hash( + self.project_remap_paths[i], + self.project_pedigree_paths[i], ), - ) - for i, project_guid in enumerate(self.project_guids) - ], - ), - hl.read_table(self.output().path).updates, + ), + ) + for i, project_guid in enumerate(self.project_guids) + ], ), - ) + hl.read_table(self.output().path).updates, + ), ) def update_table(self, ht: hl.Table) -> hl.Table: diff --git a/v03_pipeline/lib/tasks/validate_callset.py b/v03_pipeline/lib/tasks/validate_callset.py index 972e1794c..d0cf49d04 100644 --- a/v03_pipeline/lib/tasks/validate_callset.py +++ b/v03_pipeline/lib/tasks/validate_callset.py @@ -31,7 +31,7 @@ @luigi.util.inherits(BaseLoadingRunParams) class ValidateCallsetTask(BaseUpdateTask): def complete(self) -> luigi.Target: - if not self.force and super().complete(): + if super().complete(): mt = hl.read_matrix_table(self.output().path) return hasattr(mt, 'validated_sample_type') and hl.eval( self.sample_type.value == mt.validated_sample_type, @@ -49,7 +49,7 @@ def output(self) -> luigi.Target: def requires(self) -> list[luigi.Task]: requirements = [ - self.clone(WriteImportedCallsetTask, force=False), + self.clone(WriteImportedCallsetTask), ] if not self.skip_validation and self.dataset_type.can_run_validation: requirements = [ diff --git a/v03_pipeline/lib/tasks/write_family_table.py b/v03_pipeline/lib/tasks/write_family_table.py index 4d6683664..42715aff9 100644 --- a/v03_pipeline/lib/tasks/write_family_table.py +++ b/v03_pipeline/lib/tasks/write_family_table.py @@ -29,16 +29,12 @@ def output(self) -> luigi.Target: ) def complete(self) -> bool: - return ( - not self.force - and super().complete() - and hl.eval( - hl.read_table(self.output().path).updates.contains(self.callset_path), - ) + return super().complete() and hl.eval( + hl.read_table(self.output().path).updates.contains(self.callset_path), ) def requires(self) -> luigi.Task: - return self.clone(UpdateProjectTableTask, force=False) + return self.clone(UpdateProjectTableTask) def create_table(self) -> hl.Table: project_ht = hl.read_table(self.input().path) diff --git a/v03_pipeline/lib/tasks/write_imported_callset.py b/v03_pipeline/lib/tasks/write_imported_callset.py index 774e1b1f3..2ecce4f38 100644 --- a/v03_pipeline/lib/tasks/write_imported_callset.py +++ b/v03_pipeline/lib/tasks/write_imported_callset.py @@ -23,7 +23,7 @@ @luigi.util.inherits(BaseLoadingRunParams) class WriteImportedCallsetTask(BaseWriteTask): def complete(self) -> luigi.Target: - return not self.force and super().complete() + return super().complete() def output(self) -> luigi.Target: return GCSorLocalTarget( diff --git a/v03_pipeline/lib/tasks/write_new_variants_table.py b/v03_pipeline/lib/tasks/write_new_variants_table.py index 074ac101c..1def006cc 100644 --- a/v03_pipeline/lib/tasks/write_new_variants_table.py +++ b/v03_pipeline/lib/tasks/write_new_variants_table.py @@ -89,7 +89,6 @@ def requires(self) -> list[luigi.Task]: ] if self.dataset_type.has_lookup_table: # NB: the lookup table task has remapped and subsetted callset tasks as dependencies. - # Also note that force is passed here, requirements = [ *requirements, self.clone(UpdateLookupTableTask), @@ -97,7 +96,7 @@ def requires(self) -> list[luigi.Task]: else: requirements = [ *requirements, - self.clone(WriteMetadataForRunTask, force=False), + self.clone(WriteMetadataForRunTask), ] return requirements diff --git a/v03_pipeline/lib/tasks/write_project_family_tables.py b/v03_pipeline/lib/tasks/write_project_family_tables.py index 366226bde..f9b7df74f 100644 --- a/v03_pipeline/lib/tasks/write_project_family_tables.py +++ b/v03_pipeline/lib/tasks/write_project_family_tables.py @@ -21,13 +21,9 @@ def __init__(self, *args, **kwargs): self.dynamic_write_family_table_tasks = set() def complete(self) -> bool: - return ( - not self.force - and len(self.dynamic_write_family_table_tasks) >= 1 - and all( - write_family_table_task.complete() - for write_family_table_task in self.dynamic_write_family_table_tasks - ) + return len(self.dynamic_write_family_table_tasks) >= 1 and all( + write_family_table_task.complete() + for write_family_table_task in self.dynamic_write_family_table_tasks ) def run(self): @@ -35,7 +31,6 @@ def run(self): # Fetch family guids from project table update_project_table_task: luigi.Target = yield self.clone( UpdateProjectTableTask, - force=False, ) project_ht = hl.read_table(update_project_table_task.path) family_guids_in_project_table = set(hl.eval(project_ht.globals.family_guids)) diff --git a/v03_pipeline/lib/tasks/write_remapped_and_subsetted_callset.py b/v03_pipeline/lib/tasks/write_remapped_and_subsetted_callset.py index b1e4bc547..f4c08f1dc 100644 --- a/v03_pipeline/lib/tasks/write_remapped_and_subsetted_callset.py +++ b/v03_pipeline/lib/tasks/write_remapped_and_subsetted_callset.py @@ -37,16 +37,12 @@ class WriteRemappedAndSubsettedCallsetTask(BaseWriteTask): project_pedigree_path = luigi.Parameter() def complete(self) -> luigi.Target: - return ( - not self.force - and super().complete() - and hl.eval( - hl.read_matrix_table(self.output().path).globals.remap_pedigree_hash - == remap_pedigree_hash( - self.project_remap_path, - self.project_pedigree_path, - ), - ) + return super().complete() and hl.eval( + hl.read_matrix_table(self.output().path).globals.remap_pedigree_hash + == remap_pedigree_hash( + self.project_remap_path, + self.project_pedigree_path, + ), ) def output(self) -> luigi.Target: @@ -61,7 +57,7 @@ def output(self) -> luigi.Target: def requires(self) -> list[luigi.Task]: requirements = [ - self.clone(ValidateCallsetTask, force=False), + self.clone(ValidateCallsetTask), RawFileTask(self.project_pedigree_path), ] if ( diff --git a/v03_pipeline/lib/tasks/write_variant_annotations_vcf.py b/v03_pipeline/lib/tasks/write_variant_annotations_vcf.py index 37a8b46eb..4601103ea 100644 --- a/v03_pipeline/lib/tasks/write_variant_annotations_vcf.py +++ b/v03_pipeline/lib/tasks/write_variant_annotations_vcf.py @@ -24,7 +24,7 @@ def complete(self) -> bool: return not self.dataset_type.should_export_to_vcf def requires(self) -> luigi.Task: - return self.clone(BaseUpdateVariantAnnotationsTableTask, force=False) + return self.clone(BaseUpdateVariantAnnotationsTableTask) def run(self) -> None: ht = hl.read_table(self.input().path)