Skip to content

Commit 7a1966d

Browse files
authored
metadata parameters refactor (#946)
* metadata parameters refactor * fix missing param * tweak * missed one * last one * fix test * last few bugfixes * fix * bump * missed one * change parameter type due to confusing bug * push * enum
1 parent f13df93 commit 7a1966d

21 files changed

+160
-144
lines changed

v03_pipeline/lib/tasks/base/base_loading_run_params.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ class BaseLoadingRunParams(luigi.Task):
1919
run_id = luigi.Parameter()
2020
sample_type = luigi.EnumParameter(enum=SampleType)
2121
callset_path = luigi.Parameter()
22+
project_guids = luigi.ListParameter(default=[])
23+
project_remap_paths = luigi.ListParameter(default=[])
24+
project_pedigree_paths = luigi.ListParameter(default=[])
2225
ignore_missing_samples_when_remapping = luigi.BoolParameter(
2326
default=False,
2427
parsing=luigi.BoolParameter.EXPLICIT_PARSING,

v03_pipeline/lib/tasks/base/base_project_info_params.py

Lines changed: 0 additions & 11 deletions
This file was deleted.

v03_pipeline/lib/tasks/base/base_update_project_table.py

Lines changed: 0 additions & 42 deletions
This file was deleted.

v03_pipeline/lib/tasks/trigger_hail_backend_reload.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,15 @@
44

55
from v03_pipeline.lib.logger import get_logger
66
from v03_pipeline.lib.model import Env
7-
from v03_pipeline.lib.tasks.base.base_project_info_params import (
8-
BaseLoadingRunWithProjectInfoParams,
7+
from v03_pipeline.lib.tasks.base.base_loading_run_params import (
8+
BaseLoadingRunParams,
99
)
1010
from v03_pipeline.lib.tasks.write_success_file import WriteSuccessFileTask
1111

1212
logger = get_logger(__name__)
1313

1414

15-
@luigi.util.inherits(BaseLoadingRunWithProjectInfoParams)
15+
@luigi.util.inherits(BaseLoadingRunParams)
1616
class TriggerHailBackendReload(luigi.Task):
1717
def __init__(self, *args, **kwargs):
1818
super().__init__(*args, **kwargs)

v03_pipeline/lib/tasks/update_lookup_table.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@
1010
)
1111
from v03_pipeline.lib.model.constants import PROJECTS_EXCLUDED_FROM_LOOKUP
1212
from v03_pipeline.lib.paths import remapped_and_subsetted_callset_path
13-
from v03_pipeline.lib.tasks.base.base_project_info_params import (
14-
BaseLoadingRunWithProjectInfoParams,
13+
from v03_pipeline.lib.tasks.base.base_loading_run_params import (
14+
BaseLoadingRunParams,
1515
)
1616
from v03_pipeline.lib.tasks.base.base_update_lookup_table import (
1717
BaseUpdateLookupTableTask,
@@ -21,7 +21,7 @@
2121
)
2222

2323

24-
@luigi.util.inherits(BaseLoadingRunWithProjectInfoParams)
24+
@luigi.util.inherits(BaseLoadingRunParams)
2525
class UpdateLookupTableTask(BaseUpdateLookupTableTask):
2626
def complete(self) -> bool:
2727
return super().complete() and hl.eval(

v03_pipeline/lib/tasks/update_project_table.py

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,28 +9,39 @@
99
remove_family_guids,
1010
)
1111
from v03_pipeline.lib.misc.io import remap_pedigree_hash
12+
from v03_pipeline.lib.paths import project_table_path
1213
from v03_pipeline.lib.tasks.base.base_loading_run_params import BaseLoadingRunParams
13-
from v03_pipeline.lib.tasks.base.base_update_project_table import (
14-
BaseUpdateProjectTableTask,
14+
from v03_pipeline.lib.tasks.base.base_update import (
15+
BaseUpdateTask,
1516
)
17+
from v03_pipeline.lib.tasks.files import GCSorLocalTarget
1618
from v03_pipeline.lib.tasks.write_remapped_and_subsetted_callset import (
1719
WriteRemappedAndSubsettedCallsetTask,
1820
)
1921

2022

2123
@luigi.util.inherits(BaseLoadingRunParams)
22-
class UpdateProjectTableTask(BaseUpdateProjectTableTask):
23-
project_remap_path = luigi.Parameter()
24-
project_pedigree_path = luigi.Parameter()
24+
class UpdateProjectTableTask(BaseUpdateTask):
25+
project_i = luigi.IntParameter()
26+
27+
def output(self) -> luigi.Target:
28+
return GCSorLocalTarget(
29+
project_table_path(
30+
self.reference_genome,
31+
self.dataset_type,
32+
self.sample_type,
33+
self.project_guids[self.project_i],
34+
),
35+
)
2536

2637
def complete(self) -> bool:
2738
return super().complete() and hl.eval(
2839
hl.read_table(self.output().path).updates.contains(
2940
hl.Struct(
3041
callset=self.callset_path,
3142
remap_pedigree_hash=remap_pedigree_hash(
32-
self.project_remap_path,
33-
self.project_pedigree_path,
43+
self.project_remap_paths[self.project_i],
44+
self.project_pedigree_paths[self.project_i],
3445
),
3546
),
3647
),
@@ -39,6 +50,26 @@ def complete(self) -> bool:
3950
def requires(self) -> luigi.Task:
4051
return self.clone(WriteRemappedAndSubsettedCallsetTask)
4152

53+
def initialize_table(self) -> hl.Table:
54+
key_type = self.dataset_type.table_key_type(self.reference_genome)
55+
return hl.Table.parallelize(
56+
[],
57+
hl.tstruct(
58+
**key_type,
59+
filters=hl.tset(hl.tstr),
60+
# NB: entries is missing here because it is untyped
61+
# until we read the type off of the first callset aggregation.
62+
),
63+
key=key_type.fields,
64+
globals=hl.Struct(
65+
family_guids=hl.empty_array(hl.tstr),
66+
family_samples=hl.empty_dict(hl.tstr, hl.tarray(hl.tstr)),
67+
updates=hl.empty_set(
68+
hl.tstruct(callset=hl.tstr, remap_pedigree_hash=hl.tint32),
69+
),
70+
),
71+
)
72+
4273
def update_table(self, ht: hl.Table) -> hl.Table:
4374
callset_mt = hl.read_matrix_table(self.input().path)
4475
callset_ht = compute_callset_family_entries_ht(
@@ -69,8 +100,8 @@ def update_table(self, ht: hl.Table) -> hl.Table:
69100
hl.Struct(
70101
callset=self.callset_path,
71102
remap_pedigree_hash=remap_pedigree_hash(
72-
self.project_remap_path,
73-
self.project_pedigree_path,
103+
self.project_remap_paths[self.project_i],
104+
self.project_pedigree_paths[self.project_i],
74105
),
75106
),
76107
),

v03_pipeline/lib/tasks/update_project_table_test.py

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,10 @@ def test_update_project_table_task(self) -> None:
2525
run_id=TEST_RUN_ID,
2626
sample_type=SampleType.WGS,
2727
callset_path=TEST_VCF,
28-
project_guid='R0113_test_project',
29-
project_remap_path=TEST_REMAP,
30-
project_pedigree_path=TEST_PEDIGREE_3,
28+
project_guids=['R0113_test_project'],
29+
project_remap_paths=[TEST_REMAP],
30+
project_pedigree_paths=[TEST_PEDIGREE_3],
31+
project_i=0,
3132
skip_validation=True,
3233
)
3334
worker.add(upt_task)
@@ -134,9 +135,10 @@ def test_update_project_table_task_different_pedigree(self) -> None:
134135
run_id=TEST_RUN_ID,
135136
sample_type=SampleType.WGS,
136137
callset_path=TEST_VCF,
137-
project_guid='R0113_test_project',
138-
project_remap_path=TEST_REMAP,
139-
project_pedigree_path=TEST_PEDIGREE_3,
138+
project_guids=['R0113_test_project'],
139+
project_remap_paths=[TEST_REMAP],
140+
project_pedigree_paths=[TEST_PEDIGREE_3],
141+
project_i=0,
140142
skip_validation=True,
141143
)
142144
worker.add(upt_task)
@@ -147,9 +149,10 @@ def test_update_project_table_task_different_pedigree(self) -> None:
147149
run_id=TEST_RUN_ID,
148150
sample_type=SampleType.WGS,
149151
callset_path=TEST_VCF,
150-
project_guid='R0113_test_project',
151-
project_remap_path=TEST_REMAP,
152-
project_pedigree_path=TEST_PEDIGREE_3_DIFFERENT_FAMILIES,
152+
project_guids=['R0113_test_project'],
153+
project_remap_paths=[TEST_REMAP],
154+
project_pedigree_paths=[TEST_PEDIGREE_3_DIFFERENT_FAMILIES],
155+
project_i=0,
153156
skip_validation=True,
154157
)
155158
worker.add(upt_task)

v03_pipeline/lib/tasks/update_project_table_with_deleted_families.py

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,27 @@
22
import luigi
33

44
from v03_pipeline.lib.misc.family_entries import remove_family_guids
5-
from v03_pipeline.lib.tasks.base.base_update_project_table import (
6-
BaseUpdateProjectTableTask,
7-
)
5+
from v03_pipeline.lib.model import SampleType
6+
from v03_pipeline.lib.paths import project_table_path
7+
from v03_pipeline.lib.tasks.base.base_update import BaseUpdateTask
8+
from v03_pipeline.lib.tasks.files import GCSorLocalTarget
89

910

10-
class UpdateProjectTableWithDeletedFamiliesTask(BaseUpdateProjectTableTask):
11+
class UpdateProjectTableWithDeletedFamiliesTask(BaseUpdateTask):
12+
sample_type = luigi.EnumParameter(enum=SampleType)
13+
project_guid = luigi.Parameter()
1114
family_guids = luigi.ListParameter()
1215

16+
def output(self) -> luigi.Target:
17+
return GCSorLocalTarget(
18+
project_table_path(
19+
self.reference_genome,
20+
self.dataset_type,
21+
self.sample_type,
22+
self.project_guid,
23+
),
24+
)
25+
1326
def complete(self) -> bool:
1427
return super().complete() and hl.eval(
1528
hl.bind(
@@ -26,6 +39,26 @@ def complete(self) -> bool:
2639
),
2740
)
2841

42+
def initialize_table(self) -> hl.Table:
43+
key_type = self.dataset_type.table_key_type(self.reference_genome)
44+
return hl.Table.parallelize(
45+
[],
46+
hl.tstruct(
47+
**key_type,
48+
filters=hl.tset(hl.tstr),
49+
# NB: entries is missing here because it is untyped
50+
# until we read the type off of the first callset aggregation.
51+
),
52+
key=key_type.fields,
53+
globals=hl.Struct(
54+
family_guids=hl.empty_array(hl.tstr),
55+
family_samples=hl.empty_dict(hl.tstr, hl.tarray(hl.tstr)),
56+
updates=hl.empty_set(
57+
hl.tstruct(callset=hl.tstr, remap_pedigree_hash=hl.tint32),
58+
),
59+
),
60+
)
61+
2962
def update_table(self, ht: hl.Table) -> hl.Table:
3063
return remove_family_guids(
3164
ht,

v03_pipeline/lib/tasks/update_variant_annotations_table_with_new_samples.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,16 @@
99
lookup_table_path,
1010
new_variants_table_path,
1111
)
12-
from v03_pipeline.lib.tasks.base.base_project_info_params import (
13-
BaseLoadingRunWithProjectInfoParams,
12+
from v03_pipeline.lib.tasks.base.base_loading_run_params import (
13+
BaseLoadingRunParams,
1414
)
1515
from v03_pipeline.lib.tasks.base.base_update_variant_annotations_table import (
1616
BaseUpdateVariantAnnotationsTableTask,
1717
)
1818
from v03_pipeline.lib.tasks.write_new_variants_table import WriteNewVariantsTableTask
1919

2020

21-
@luigi.util.inherits(BaseLoadingRunWithProjectInfoParams)
21+
@luigi.util.inherits(BaseLoadingRunParams)
2222
class UpdateVariantAnnotationsTableWithNewSamplesTask(
2323
BaseUpdateVariantAnnotationsTableTask,
2424
):

v03_pipeline/lib/tasks/validate_callset_test.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ def test_validate_callset_multiple_exceptions(
6262
# a NON_REF allele type at position chr1: 902024, missing
6363
# all contigs but chr1, and contains non-coding variants.
6464
callset_path=MULTIPLE_VALIDATION_EXCEPTIONS_VCF,
65+
project_guids=['project_a'],
6566
skip_validation=False,
6667
run_id=TEST_RUN_ID,
6768
)
@@ -74,6 +75,7 @@ def test_validate_callset_multiple_exceptions(
7475
dataset_type=DatasetType.SNV_INDEL,
7576
sample_type=SampleType.WES,
7677
callset_path=MULTIPLE_VALIDATION_EXCEPTIONS_VCF,
78+
project_guids=['project_a'],
7779
skip_validation=False,
7880
run_id=TEST_RUN_ID,
7981
)
@@ -82,6 +84,7 @@ def test_validate_callset_multiple_exceptions(
8284
self.assertDictEqual(
8385
json.load(f),
8486
{
87+
'project_guids': ['project_a'],
8588
'error_messages': [
8689
'Alleles with invalid allele <NON_REF> are present in the callset. This appears to be a GVCF containing records for sites with no variants.',
8790
"Variants are present multiple times in the callset: ['1-902088-G-A']",

0 commit comments

Comments
 (0)