Skip to content

Dev #897

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 17, 2024
Merged

Dev #897

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions v03_pipeline/bin/pipeline_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,13 @@ def main():
run_id=datetime.datetime.now(datetime.timezone.utc).strftime(
'%Y%m%d-%H%M%S',
),
force=False,
**task_kwargs,
),
*[
WriteProjectFamilyTablesTask(
project_guid=lpr.projects_to_run[i],
project_remap_path=project_remap_paths[i],
project_pedigree_path=project_pedigree_paths[i],
force=False,
**task_kwargs,
)
for i in range(len(lpr.projects_to_run))
Expand Down
4 changes: 0 additions & 4 deletions v03_pipeline/lib/tasks/base/base_loading_run_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ class BaseLoadingRunParams(luigi.Task):
default=False,
parsing=luigi.BoolParameter.EXPLICIT_PARSING,
)
force = luigi.BoolParameter(
default=False,
parsing=luigi.BoolParameter.EXPLICIT_PARSING,
)
skip_check_sex_and_relatedness = luigi.BoolParameter(
default=False,
parsing=luigi.BoolParameter.EXPLICIT_PARSING,
Expand Down
3 changes: 0 additions & 3 deletions v03_pipeline/lib/tasks/base/base_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@ def run(self) -> None:
ht = read_fn(self.output().path)
ht = self.update_table(ht)
write(ht, self.output().path)
# Set force to false after run, allowing "complete()" to succeeded
# when dependencies are re-evaluated.
self.force = False

def initialize_table(self) -> hl.Table:
raise NotImplementedError
Expand Down
3 changes: 0 additions & 3 deletions v03_pipeline/lib/tasks/base/base_write.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@ def run(self) -> None:
self.init_hail()
ht = self.create_table()
write(ht, self.output().path)
# Set force to false after run, allowing "complete()" to succeeded
# when dependencies are re-evaluated.
self.force = False

def create_table(self) -> hl.Table:
raise NotImplementedError
39 changes: 17 additions & 22 deletions v03_pipeline/lib/tasks/update_lookup_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,36 +27,31 @@ class UpdateLookupTableTask(BaseUpdateLookupTableTask):
run_id = luigi.Parameter()

def complete(self) -> bool:
return (
not self.force
and super().complete()
and hl.eval(
hl.bind(
lambda updates: hl.all(
[
updates.contains(
hl.Struct(
callset=self.callset_path,
project_guid=project_guid,
remap_pedigree_hash=remap_pedigree_hash(
self.project_remap_paths[i],
self.project_pedigree_paths[i],
),
return super().complete() and hl.eval(
hl.bind(
lambda updates: hl.all(
[
updates.contains(
hl.Struct(
callset=self.callset_path,
project_guid=project_guid,
remap_pedigree_hash=remap_pedigree_hash(
self.project_remap_paths[i],
self.project_pedigree_paths[i],
),
)
for i, project_guid in enumerate(self.project_guids)
],
),
hl.read_table(self.output().path).updates,
),
)
for i, project_guid in enumerate(self.project_guids)
],
),
)
hl.read_table(self.output().path).updates,
),
)

def requires(self) -> list[luigi.Task]:
return [
self.clone(
WriteMetadataForRunTask,
force=False,
),
]

Expand Down
22 changes: 9 additions & 13 deletions v03_pipeline/lib/tasks/update_project_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,20 @@ class UpdateProjectTableTask(BaseUpdateProjectTableTask):
project_pedigree_path = luigi.Parameter()

def complete(self) -> bool:
return (
not self.force
and super().complete()
and hl.eval(
hl.read_table(self.output().path).updates.contains(
hl.Struct(
callset=self.callset_path,
remap_pedigree_hash=remap_pedigree_hash(
self.project_remap_path,
self.project_pedigree_path,
),
return super().complete() and hl.eval(
hl.read_table(self.output().path).updates.contains(
hl.Struct(
callset=self.callset_path,
remap_pedigree_hash=remap_pedigree_hash(
self.project_remap_path,
self.project_pedigree_path,
),
),
)
),
)

def requires(self) -> luigi.Task:
return self.clone(WriteRemappedAndSubsettedCallsetTask, force=False)
return self.clone(WriteRemappedAndSubsettedCallsetTask)

def update_table(self, ht: hl.Table) -> hl.Table:
callset_mt = hl.read_matrix_table(self.input().path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,29 +32,25 @@ def requires(self) -> list[luigi.Task]:
]

def complete(self) -> bool:
return (
not self.force
and super().complete()
and hl.eval(
hl.bind(
lambda updates: hl.all(
[
updates.contains(
hl.Struct(
callset=self.callset_path,
project_guid=project_guid,
remap_pedigree_hash=remap_pedigree_hash(
self.project_remap_paths[i],
self.project_pedigree_paths[i],
),
return super().complete() and hl.eval(
hl.bind(
lambda updates: hl.all(
[
updates.contains(
hl.Struct(
callset=self.callset_path,
project_guid=project_guid,
remap_pedigree_hash=remap_pedigree_hash(
self.project_remap_paths[i],
self.project_pedigree_paths[i],
),
)
for i, project_guid in enumerate(self.project_guids)
],
),
hl.read_table(self.output().path).updates,
),
)
for i, project_guid in enumerate(self.project_guids)
],
),
)
hl.read_table(self.output().path).updates,
),
)

def update_table(self, ht: hl.Table) -> hl.Table:
Expand Down
4 changes: 2 additions & 2 deletions v03_pipeline/lib/tasks/validate_callset.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
@luigi.util.inherits(BaseLoadingRunParams)
class ValidateCallsetTask(BaseUpdateTask):
def complete(self) -> luigi.Target:
if not self.force and super().complete():
if super().complete():
mt = hl.read_matrix_table(self.output().path)
return hasattr(mt, 'validated_sample_type') and hl.eval(
self.sample_type.value == mt.validated_sample_type,
Expand All @@ -49,7 +49,7 @@ def output(self) -> luigi.Target:

def requires(self) -> list[luigi.Task]:
requirements = [
self.clone(WriteImportedCallsetTask, force=False),
self.clone(WriteImportedCallsetTask),
]
if not self.skip_validation and self.dataset_type.can_run_validation:
requirements = [
Expand Down
10 changes: 3 additions & 7 deletions v03_pipeline/lib/tasks/write_family_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,12 @@ def output(self) -> luigi.Target:
)

def complete(self) -> bool:
return (
not self.force
and super().complete()
and hl.eval(
hl.read_table(self.output().path).updates.contains(self.callset_path),
)
return super().complete() and hl.eval(
hl.read_table(self.output().path).updates.contains(self.callset_path),
)

def requires(self) -> luigi.Task:
return self.clone(UpdateProjectTableTask, force=False)
return self.clone(UpdateProjectTableTask)

def create_table(self) -> hl.Table:
project_ht = hl.read_table(self.input().path)
Expand Down
2 changes: 1 addition & 1 deletion v03_pipeline/lib/tasks/write_imported_callset.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
@luigi.util.inherits(BaseLoadingRunParams)
class WriteImportedCallsetTask(BaseWriteTask):
def complete(self) -> luigi.Target:
return not self.force and super().complete()
return super().complete()

def output(self) -> luigi.Target:
return GCSorLocalTarget(
Expand Down
3 changes: 1 addition & 2 deletions v03_pipeline/lib/tasks/write_new_variants_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,14 @@ def requires(self) -> list[luigi.Task]:
]
if self.dataset_type.has_lookup_table:
# NB: the lookup table task has remapped and subsetted callset tasks as dependencies.
# Also note that force is passed here,
requirements = [
*requirements,
self.clone(UpdateLookupTableTask),
]
else:
requirements = [
*requirements,
self.clone(WriteMetadataForRunTask, force=False),
self.clone(WriteMetadataForRunTask),
]
return requirements

Expand Down
11 changes: 3 additions & 8 deletions v03_pipeline/lib/tasks/write_project_family_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,16 @@ def __init__(self, *args, **kwargs):
self.dynamic_write_family_table_tasks = set()

def complete(self) -> bool:
return (
not self.force
and len(self.dynamic_write_family_table_tasks) >= 1
and all(
write_family_table_task.complete()
for write_family_table_task in self.dynamic_write_family_table_tasks
)
return len(self.dynamic_write_family_table_tasks) >= 1 and all(
write_family_table_task.complete()
for write_family_table_task in self.dynamic_write_family_table_tasks
)

def run(self):
# https://luigi.readthedocs.io/en/stable/tasks.html#dynamic-dependencies
# Fetch family guids from project table
update_project_table_task: luigi.Target = yield self.clone(
UpdateProjectTableTask,
force=False,
)
project_ht = hl.read_table(update_project_table_task.path)
family_guids_in_project_table = set(hl.eval(project_ht.globals.family_guids))
Expand Down
18 changes: 7 additions & 11 deletions v03_pipeline/lib/tasks/write_remapped_and_subsetted_callset.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,12 @@ class WriteRemappedAndSubsettedCallsetTask(BaseWriteTask):
project_pedigree_path = luigi.Parameter()

def complete(self) -> luigi.Target:
return (
not self.force
and super().complete()
and hl.eval(
hl.read_matrix_table(self.output().path).globals.remap_pedigree_hash
== remap_pedigree_hash(
self.project_remap_path,
self.project_pedigree_path,
),
)
return super().complete() and hl.eval(
hl.read_matrix_table(self.output().path).globals.remap_pedigree_hash
== remap_pedigree_hash(
self.project_remap_path,
self.project_pedigree_path,
),
)

def output(self) -> luigi.Target:
Expand All @@ -61,7 +57,7 @@ def output(self) -> luigi.Target:

def requires(self) -> list[luigi.Task]:
requirements = [
self.clone(ValidateCallsetTask, force=False),
self.clone(ValidateCallsetTask),
RawFileTask(self.project_pedigree_path),
]
if (
Expand Down
2 changes: 1 addition & 1 deletion v03_pipeline/lib/tasks/write_variant_annotations_vcf.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def complete(self) -> bool:
return not self.dataset_type.should_export_to_vcf

def requires(self) -> luigi.Task:
return self.clone(BaseUpdateVariantAnnotationsTableTask, force=False)
return self.clone(BaseUpdateVariantAnnotationsTableTask)

def run(self) -> None:
ht = hl.read_table(self.input().path)
Expand Down
Loading