Skip to content

Commit 26c67e4

Browse files
authored
Fix missing sample types in delete dags (#908)
* Fix missing sample types in delete dags * Fix typo
1 parent 900f608 commit 26c67e4

File tree

5 files changed

+120
-26
lines changed

5 files changed

+120
-26
lines changed
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import os
2+
3+
import hail as hl
4+
import hailtop.fs as hfs
5+
6+
from v03_pipeline.lib.model import DatasetType, ReferenceGenome, SampleType
7+
from v03_pipeline.lib.paths import project_table_path
8+
9+
10+
def get_valid_project_tables(
11+
reference_genome: ReferenceGenome,
12+
dataset_type: DatasetType,
13+
project_guid: str,
14+
) -> set[tuple[hl.Table, SampleType]]:
15+
project_tables = set()
16+
for sample_type in SampleType:
17+
project_ht_path = project_table_path(
18+
reference_genome,
19+
dataset_type,
20+
sample_type,
21+
project_guid,
22+
)
23+
if hfs.exists(project_ht_path) and hfs.exists(
24+
os.path.join(project_ht_path, '_SUCCESS'),
25+
):
26+
project_ht = hl.read_table(project_ht_path)
27+
project_tables.add((project_ht, sample_type))
28+
if len(project_tables) == 0:
29+
msg = f'No project tables found for {project_guid}'
30+
raise RuntimeError(msg)
31+
return project_tables

v03_pipeline/lib/tasks/__init__.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from v03_pipeline.lib.tasks.delete_project_family_tables import (
44
DeleteProjectFamilyTablesTask,
55
)
6-
from v03_pipeline.lib.tasks.delete_project_table import DeleteProjectTableTask
6+
from v03_pipeline.lib.tasks.delete_project_tables import DeleteProjectTablesTask
77
from v03_pipeline.lib.tasks.migrate_all_lookup_tables import MigrateAllLookupTablesTask
88
from v03_pipeline.lib.tasks.migrate_all_variant_annotations_tables import (
99
MigrateAllVariantAnnotationsTablesTask,
@@ -21,8 +21,8 @@
2121
UpdateLookupTableWithDeletedProjectTask,
2222
)
2323
from v03_pipeline.lib.tasks.update_project_table import UpdateProjectTableTask
24-
from v03_pipeline.lib.tasks.update_project_table_with_deleted_families import (
25-
UpdateProjectTableWithDeletedFamiliesTask,
24+
from v03_pipeline.lib.tasks.update_project_tables_with_deleted_families import (
25+
UpdateProjectTablesWithDeletedFamiliesTask,
2626
)
2727
from v03_pipeline.lib.tasks.update_variant_annotations_table_with_deleted_families import (
2828
UpdateVariantAnnotationsTableWithDeletedFamiliesTask,
@@ -42,11 +42,11 @@
4242
'DeleteFamilyTableTask',
4343
'DeleteFamilyTablesTask',
4444
'DeleteProjectFamilyTablesTask',
45-
'DeleteProjectTableTask',
45+
'DeleteProjectTablesTask',
4646
'MigrateAllLookupTablesTask',
4747
'MigrateAllVariantAnnotationsTablesTask',
4848
'UpdateProjectTableTask',
49-
'UpdateProjectTableWithDeletedFamiliesTask',
49+
'UpdateProjectTablesWithDeletedFamiliesTask',
5050
'UpdateLookupTableTask',
5151
'UpdateLookupTableWithDeletedProjectTask',
5252
'UpdateLookupTableWithDeletedFamiliesTask',

v03_pipeline/lib/tasks/delete_project_family_tables.py

Lines changed: 6 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,12 @@
11
import hail as hl
2-
import hailtop.fs as hfs
32
import luigi
43
import luigi.util
54

6-
from v03_pipeline.lib.model import SampleType
7-
from v03_pipeline.lib.paths import project_table_path
5+
from v03_pipeline.lib.misc.project_tables import get_valid_project_tables
86
from v03_pipeline.lib.tasks.base.base_loading_pipeline_params import (
97
BaseLoadingPipelineParams,
108
)
119
from v03_pipeline.lib.tasks.delete_family_table import DeleteFamilyTableTask
12-
from v03_pipeline.lib.tasks.files import HailTableTask
1310

1411

1512
@luigi.util.inherits(BaseLoadingPipelineParams)
@@ -27,23 +24,11 @@ def complete(self) -> bool:
2724
)
2825

2926
def run(self):
30-
project_tables = set()
31-
for sample_type in SampleType:
32-
project_ht_path = project_table_path(
33-
self.reference_genome,
34-
self.dataset_type,
35-
sample_type,
36-
self.project_guid,
37-
)
38-
if hfs.exists(project_ht_path):
39-
project_table_task: luigi.Target = yield HailTableTask(project_ht_path)
40-
project_ht = hl.read_table(project_table_task.path)
41-
project_tables.add((project_ht, sample_type))
42-
43-
if len(project_tables) == 0:
44-
msg = f'No project tables found for {self.project_guid}'
45-
raise RuntimeError(msg)
46-
27+
project_tables = get_valid_project_tables(
28+
self.reference_genome,
29+
self.dataset_type,
30+
self.project_guid,
31+
)
4732
for project_ht, sample_type in project_tables:
4833
family_guids = hl.eval(project_ht.globals.family_guids)
4934
for family_guid in family_guids:
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import luigi
2+
3+
from v03_pipeline.lib.model import SampleType
4+
from v03_pipeline.lib.tasks.base.base_loading_pipeline_params import (
5+
BaseLoadingPipelineParams,
6+
)
7+
from v03_pipeline.lib.tasks.delete_project_table import DeleteProjectTableTask
8+
9+
10+
@luigi.util.inherits(BaseLoadingPipelineParams)
11+
class DeleteProjectTablesTask(luigi.Task):
12+
project_guid = luigi.Parameter()
13+
14+
def __init__(self, *args, **kwargs):
15+
super().__init__(*args, **kwargs)
16+
self.dynamic_delete_project_table_tasks = set()
17+
18+
def complete(self) -> bool:
19+
return len(self.dynamic_delete_project_table_tasks) >= 1 and all(
20+
delete_project_table_task.complete()
21+
for delete_project_table_task in self.dynamic_delete_project_table_tasks
22+
)
23+
24+
def run(self):
25+
for sample_type in SampleType:
26+
self.dynamic_delete_project_table_tasks.add(
27+
DeleteProjectTableTask(
28+
reference_genome=self.reference_genome,
29+
dataset_type=self.dataset_type,
30+
sample_type=sample_type,
31+
project_guid=self.project_guid,
32+
),
33+
)
34+
yield self.dynamic_delete_project_table_tasks
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import luigi
2+
import luigi.util
3+
4+
from v03_pipeline.lib.misc.project_tables import get_valid_project_tables
5+
from v03_pipeline.lib.tasks.base.base_loading_pipeline_params import (
6+
BaseLoadingPipelineParams,
7+
)
8+
from v03_pipeline.lib.tasks.update_project_table_with_deleted_families import (
9+
UpdateProjectTableWithDeletedFamiliesTask,
10+
)
11+
12+
13+
@luigi.util.inherits(BaseLoadingPipelineParams)
14+
class UpdateProjectTablesWithDeletedFamiliesTask(luigi.Task):
15+
project_guid = luigi.Parameter()
16+
family_guids = luigi.ListParameter()
17+
18+
def __init__(self, *args, **kwargs):
19+
super().__init__(*args, **kwargs)
20+
self.dynamic_update_project_table_tasks = set()
21+
22+
def complete(self) -> bool:
23+
return len(self.dynamic_update_project_table_tasks) >= 1 and all(
24+
update_project_table_task.complete()
25+
for update_project_table_task in self.dynamic_update_project_table_tasks
26+
)
27+
28+
def run(self):
29+
project_tables = get_valid_project_tables(
30+
self.reference_genome,
31+
self.dataset_type,
32+
self.project_guid,
33+
)
34+
for _, sample_type in project_tables:
35+
self.dynamic_update_project_table_tasks.add(
36+
UpdateProjectTableWithDeletedFamiliesTask(
37+
reference_genome=self.reference_genome,
38+
dataset_type=self.dataset_type,
39+
sample_type=sample_type,
40+
project_guid=self.project_guid,
41+
family_guids=self.family_guids,
42+
),
43+
)
44+
yield self.dynamic_update_project_table_tasks

0 commit comments

Comments
 (0)