Skip to content

Commit ebc50a3

Browse files
authored
Merge pull request #762 from broadinstitute/dev
Dev
2 parents 16f71ac + fe49297 commit ebc50a3

21 files changed

+602
-448
lines changed
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import hail as hl
2+
3+
from v03_pipeline.lib.model import (
4+
DatasetType,
5+
ReferenceDatasetCollection,
6+
ReferenceGenome,
7+
)
8+
from v03_pipeline.lib.paths import (
9+
valid_reference_dataset_collection_path,
10+
)
11+
12+
13+
def get_rdc_annotation_dependencies(
14+
dataset_type: DatasetType,
15+
reference_genome: ReferenceGenome,
16+
) -> dict[str, hl.Table]:
17+
deps = {}
18+
for rdc in ReferenceDatasetCollection.for_reference_genome_dataset_type(
19+
reference_genome,
20+
dataset_type,
21+
):
22+
deps[f'{rdc.value}_ht'] = hl.read_table(
23+
valid_reference_dataset_collection_path(
24+
reference_genome,
25+
dataset_type,
26+
rdc,
27+
),
28+
)
29+
return deps

v03_pipeline/lib/misc/callsets.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import functools
2+
3+
import hail as hl
4+
5+
from v03_pipeline.lib.model import DatasetType, ReferenceGenome
6+
from v03_pipeline.lib.paths import remapped_and_subsetted_callset_path
7+
8+
9+
def get_callset_ht(
10+
reference_genome: ReferenceGenome,
11+
dataset_type: DatasetType,
12+
callset_paths: list[str],
13+
project_guids: list[str],
14+
project_remap_paths: list[str],
15+
project_pedigree_paths: list[str],
16+
):
17+
callset_hts = [
18+
hl.read_matrix_table(
19+
remapped_and_subsetted_callset_path(
20+
reference_genome,
21+
dataset_type,
22+
callset_path,
23+
project_guid,
24+
),
25+
).rows()
26+
for (callset_path, project_guid, _, _) in callset_project_pairs(
27+
callset_paths,
28+
project_guids,
29+
project_remap_paths,
30+
project_pedigree_paths,
31+
)
32+
]
33+
34+
# Drop any fields potentially unshared/unused by the annotations.
35+
for i, callset_ht in enumerate(callset_hts):
36+
for row_field in dataset_type.optional_row_fields:
37+
if hasattr(callset_ht, row_field):
38+
callset_hts[i] = callset_ht.drop(row_field)
39+
40+
callset_ht = functools.reduce(
41+
(lambda ht1, ht2: ht1.union(ht2, unify=True)),
42+
callset_hts,
43+
)
44+
return callset_ht.distinct()
45+
46+
47+
def callset_project_pairs(
48+
callset_paths: list[str],
49+
project_guids: list[str],
50+
project_remap_paths: list[str],
51+
project_pedigree_paths: list[str],
52+
):
53+
if len(callset_paths) == len(project_guids):
54+
return zip(
55+
callset_paths,
56+
project_guids,
57+
project_remap_paths,
58+
project_pedigree_paths,
59+
strict=True,
60+
)
61+
return (
62+
(callset_path, project_guid, project_remap_path, project_pedigree_path)
63+
for callset_path in callset_paths
64+
for (project_guid, project_remap_path, project_pedigree_path) in zip(
65+
project_guids,
66+
project_remap_paths,
67+
project_pedigree_paths,
68+
strict=True,
69+
)
70+
)

v03_pipeline/lib/misc/util.py

Lines changed: 0 additions & 24 deletions
This file was deleted.

v03_pipeline/lib/paths.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,3 +236,18 @@ def variant_annotations_table_path(
236236
),
237237
'annotations.ht',
238238
)
239+
240+
241+
def new_variants_table_path(
242+
reference_genome: ReferenceGenome,
243+
dataset_type: DatasetType,
244+
run_id: str,
245+
) -> str:
246+
return os.path.join(
247+
runs_path(
248+
reference_genome,
249+
dataset_type,
250+
),
251+
run_id,
252+
'new_variants.ht',
253+
)

v03_pipeline/lib/paths_test.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
imported_callset_path,
1313
lookup_table_path,
1414
metadata_for_run_path,
15+
new_variants_table_path,
1516
project_table_path,
1617
relatedness_check_table_path,
1718
remapped_and_subsetted_callset_path,
@@ -160,3 +161,13 @@ def test_imported_callset_path(self) -> None:
160161
),
161162
'/seqr-loading-temp/v03/GRCh38/SNV_INDEL/imported_callsets/ead56bb177a5de24178e1e622ce1d8beb3f8892bdae1c925d22ca0af4013d6dd.mt',
162163
)
164+
165+
def test_new_variants_table_path(self) -> None:
166+
self.assertEqual(
167+
new_variants_table_path(
168+
ReferenceGenome.GRCh38,
169+
DatasetType.SNV_INDEL,
170+
'manual__2023-06-26T18:30:09.349671+00:00',
171+
),
172+
'/hail-search-data/v03/GRCh38/SNV_INDEL/runs/manual__2023-06-26T18:30:09.349671+00:00/new_variants.ht',
173+
)

v03_pipeline/lib/tasks/__init__.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
from v03_pipeline.lib.tasks.delete_old_runs import DeleteOldRunsTask
21
from v03_pipeline.lib.tasks.reference_data.write_cached_reference_dataset_query import (
32
WriteCachedReferenceDatasetQuery,
43
)
@@ -15,7 +14,6 @@
1514
)
1615

1716
__all__ = [
18-
'DeleteOldRunsTask',
1917
'UpdateProjectTableTask',
2018
'UpdateLookupTableTask',
2119
'UpdateVariantAnnotationsTableWithNewSamplesTask',

v03_pipeline/lib/tasks/base/base_update_task.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ def run(self) -> None:
1313
ht = hl.read_table(self.output().path)
1414
ht = self.update_table(ht)
1515
write(ht, self.output().path)
16+
# Set force to false after run, allowing "complete()" to succeeded
17+
# when dependencies are re-evaluated.
18+
self.force = False
1619

1720
def initialize_table(self) -> hl.Table:
1821
raise NotImplementedError

v03_pipeline/lib/tasks/base/base_variant_annotations_table.py

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,13 @@
22
import luigi
33

44
from v03_pipeline.lib.annotations.enums import annotate_enums
5-
from v03_pipeline.lib.model import Env, ReferenceDatasetCollection
5+
from v03_pipeline.lib.annotations.rdc_dependencies import (
6+
get_rdc_annotation_dependencies,
7+
)
8+
from v03_pipeline.lib.model import (
9+
Env,
10+
ReferenceDatasetCollection,
11+
)
612
from v03_pipeline.lib.paths import (
713
valid_reference_dataset_collection_path,
814
variant_annotations_table_path,
@@ -17,19 +23,7 @@
1723
class BaseVariantAnnotationsTableTask(BaseUpdateTask):
1824
@property
1925
def rdc_annotation_dependencies(self) -> dict[str, hl.Table]:
20-
annotation_dependencies = {}
21-
for rdc in ReferenceDatasetCollection.for_reference_genome_dataset_type(
22-
self.reference_genome,
23-
self.dataset_type,
24-
):
25-
annotation_dependencies[f'{rdc.value}_ht'] = hl.read_table(
26-
valid_reference_dataset_collection_path(
27-
self.reference_genome,
28-
self.dataset_type,
29-
rdc,
30-
),
31-
)
32-
return annotation_dependencies
26+
return get_rdc_annotation_dependencies(self.dataset_type, self.reference_genome)
3327

3428
def output(self) -> luigi.Target:
3529
return GCSorLocalTarget(

v03_pipeline/lib/tasks/base/base_write_task.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ def run(self) -> None:
99
self.init_hail()
1010
ht = self.create_table()
1111
write(ht, self.output().path)
12+
# Set force to false after run, allowing "complete()" to succeeded
13+
# when dependencies are re-evaluated.
14+
self.force = False
1215

1316
def create_table(self) -> hl.Table:
1417
raise NotImplementedError

v03_pipeline/lib/tasks/delete_old_runs.py

Lines changed: 0 additions & 53 deletions
This file was deleted.

0 commit comments

Comments
 (0)