1
1
from datetime import datetime
2
+ from types import FunctionType
2
3
3
4
import hail as hl
4
5
import pytz
11
12
from v03_pipeline .lib .reference_data .config import CONFIG
12
13
13
14
14
- def parse_version (ht : hl .Table , dataset : str , config : dict ) -> hl .StringExpression :
15
- annotated_version = ht .globals .get ('version' , hl .missing (hl .tstr ))
16
- config_version = config .get ('version' , hl .missing (hl .tstr ))
17
- return (
18
- hl .case ()
19
- .when (hl .is_missing (config_version ), annotated_version )
20
- .when (hl .is_missing (annotated_version ), config_version )
21
- .when (annotated_version == config_version , config_version )
22
- .or_error (
23
- f'found mismatching versions for dataset { dataset } , { config_version } , { hl .eval (annotated_version )} ' ,
15
+ def update_or_create_joined_ht (
16
+ reference_dataset_collection : ReferenceDatasetCollection ,
17
+ dataset_type : DatasetType ,
18
+ reference_genome : ReferenceGenome ,
19
+ dataset : str | None ,
20
+ joined_ht : hl .Table ,
21
+ ) -> hl .Table :
22
+ datasets = (
23
+ [dataset ]
24
+ if dataset is not None
25
+ else reference_dataset_collection .datasets (dataset_type )
26
+ )
27
+
28
+ for dataset in datasets :
29
+ dataset_ht = get_dataset_ht (dataset , reference_genome )
30
+
31
+ if dataset in joined_ht .row :
32
+ joined_ht = joined_ht .drop (dataset )
33
+
34
+ joined_ht = joined_ht .join (dataset_ht , 'outer' )
35
+ joined_ht = annotate_dataset_globals (joined_ht , dataset , dataset_ht )
36
+
37
+ return joined_ht .filter (
38
+ hl .any (
39
+ [
40
+ ~ hl .is_missing (joined_ht [dataset ])
41
+ for dataset in reference_dataset_collection .datasets (dataset_type )
42
+ ],
43
+ ),
44
+ )
45
+
46
+
47
+ def get_dataset_ht (
48
+ dataset : str ,
49
+ reference_genome : ReferenceGenome ,
50
+ ) -> hl .Table :
51
+ config = CONFIG [dataset ][reference_genome .v02_value ]
52
+ ht = (
53
+ config ['custom_import' ](config ['source_path' ], reference_genome )
54
+ if 'custom_import' in config
55
+ else hl .read_table (config ['path' ])
56
+ )
57
+ if hasattr (ht , 'locus' ):
58
+ ht = ht .filter (
59
+ hl .set (reference_genome .standard_contigs ).contains (ht .locus .contig ),
24
60
)
61
+
62
+ ht = ht .filter (config ['filter' ](ht )) if 'filter' in config else ht
63
+ ht = ht .select (
64
+ ** {
65
+ ** get_select_fields (config .get ('select' ), ht ),
66
+ ** get_custom_select_fields (config .get ('custom_select' ), ht ),
67
+ },
25
68
)
69
+ ht = ht .transmute (** get_enum_select_fields (config .get ('enum_select' ), ht ))
70
+ ht = ht .select_globals (
71
+ path = (config ['source_path' ] if 'custom_import' in config else config ['path' ]),
72
+ version = parse_dataset_version (ht , dataset , config ),
73
+ enums = hl .Struct (
74
+ ** config .get (
75
+ 'enum_select' ,
76
+ hl .missing (hl .tstruct (hl .tstr , hl .tarray (hl .tstr ))),
77
+ ),
78
+ ),
79
+ )
80
+ return ht .select (** {dataset : ht .row .drop (* ht .key )}).distinct ()
26
81
27
82
28
- def get_select_fields (selects , base_ht ) :
83
+ def get_select_fields (selects : list | dict | None , base_ht : hl . Table ) -> dict :
29
84
"""
30
85
Generic function that takes in a select config and base_ht and generates a
31
86
select dict that is generated from traversing the base_ht and extracting the right
@@ -57,13 +112,13 @@ def get_select_fields(selects, base_ht):
57
112
return select_fields
58
113
59
114
60
- def get_custom_select_fields (custom_select , ht ) :
115
+ def get_custom_select_fields (custom_select : FunctionType | None , ht : hl . Table ) -> dict :
61
116
if custom_select is None :
62
117
return {}
63
118
return custom_select (ht )
64
119
65
120
66
- def get_enum_select_fields (enum_selects , ht ) :
121
+ def get_enum_select_fields (enum_selects : dict | None , ht : hl . Table ) -> dict :
67
122
enum_select_fields = {}
68
123
if enum_selects is None :
69
124
return enum_select_fields
@@ -89,40 +144,22 @@ def get_enum_select_fields(enum_selects, ht):
89
144
return enum_select_fields
90
145
91
146
92
- def get_ht (
147
+ def parse_dataset_version (
148
+ ht : hl .Table ,
93
149
dataset : str ,
94
- reference_genome : ReferenceGenome ,
95
- ):
96
- config = CONFIG [ dataset ][ reference_genome . v02_value ]
97
- ht = (
98
- config [ 'custom_import' ]( config [ 'source_path' ], reference_genome )
99
- if 'custom_import' in config
100
- else hl .read_table ( config [ 'path' ] )
101
- )
102
- if hasattr ( ht , 'locus' ):
103
- ht = ht . filter (
104
- hl .set ( reference_genome . standard_contigs ). contains ( ht . locus . contig ) ,
150
+ config : dict ,
151
+ ) -> hl . StringExpression :
152
+ annotated_version = ht . globals . get ( 'version' , hl . missing ( hl . tstr ))
153
+ config_version = config . get ( 'version' , hl . missing ( hl . tstr ))
154
+ return (
155
+ hl . case ()
156
+ . when ( hl .is_missing ( config_version ), annotated_version )
157
+ . when ( hl . is_missing ( annotated_version ), config_version )
158
+ . when ( annotated_version == config_version , config_version )
159
+ . or_error (
160
+ f'found mismatching versions for dataset { dataset } , { config_version } , { hl .eval ( annotated_version ) } ' ,
105
161
)
106
-
107
- ht = ht .filter (config ['filter' ](ht )) if 'filter' in config else ht
108
- ht = ht .select (
109
- ** {
110
- ** get_select_fields (config .get ('select' ), ht ),
111
- ** get_custom_select_fields (config .get ('custom_select' ), ht ),
112
- },
113
162
)
114
- ht = ht .transmute (** get_enum_select_fields (config .get ('enum_select' ), ht ))
115
- ht = ht .select_globals (
116
- path = (config ['source_path' ] if 'custom_import' in config else config ['path' ]),
117
- version = parse_version (ht , dataset , config ),
118
- enums = hl .Struct (
119
- ** config .get (
120
- 'enum_select' ,
121
- hl .missing (hl .tstruct (hl .tstr , hl .tarray (hl .tstr ))),
122
- ),
123
- ),
124
- )
125
- return ht .select (** {dataset : ht .row .drop (* ht .key )}).distinct ()
126
163
127
164
128
165
def annotate_dataset_globals (joined_ht : hl .Table , dataset : str , dataset_ht : hl .Table ):
@@ -153,7 +190,7 @@ def join_hts(
153
190
),
154
191
)
155
192
for dataset in reference_dataset_collection .datasets (dataset_type ):
156
- dataset_ht = get_ht (dataset , reference_genome )
193
+ dataset_ht = get_dataset_ht (dataset , reference_genome )
157
194
joined_ht = joined_ht .join (dataset_ht , 'outer' )
158
195
joined_ht = annotate_dataset_globals (joined_ht , dataset , dataset_ht )
159
196
return joined_ht
@@ -167,7 +204,7 @@ def update_existing_joined_hts(
167
204
reference_dataset_collection : ReferenceDatasetCollection ,
168
205
):
169
206
joined_ht = hl .read_table (destination_path )
170
- dataset_ht = get_ht (dataset , reference_genome )
207
+ dataset_ht = get_dataset_ht (dataset , reference_genome )
171
208
joined_ht = joined_ht .drop (dataset )
172
209
joined_ht = joined_ht .join (dataset_ht , 'outer' )
173
210
joined_ht = joined_ht .filter (
0 commit comments