Skip to content

Commit 4d51857

Browse files
committed
Merge branch 'benb/migration2' of github.com:broadinstitute/seqr-loading-pipelines into benb/lookup_table_refactor
2 parents 3a69e84 + c251883 commit 4d51857

File tree

1 file changed

+177
-0
lines changed

1 file changed

+177
-0
lines changed

v03_pipeline/bin/migration.py

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
import json
2+
import os
3+
import subprocess
4+
5+
import hail as hl
6+
7+
from v03_pipeline.lib.misc.lookup import join_lookup_hts
8+
from v03_pipeline.lib.model import DatasetType, Env, ReferenceGenome
9+
from v03_pipeline.lib.model.constants import PROJECTS_EXCLUDED_FROM_LOOKUP
10+
11+
# Need to use the GCP bucket as temp storage for very large callset joins
12+
hl.init(tmp_dir='gs://seqr-scratch-temp', idempotent=True)
13+
14+
# Interval ref data join causes shuffle death, this prevents it
15+
hl._set_flags(use_new_shuffle='1', no_whole_stage_codegen='1') # noqa: SLF001
16+
sc = hl.spark_context()
17+
sc.addPyFile('gs://seqr-luigi/releases/dev/latest/pyscripts.zip')
18+
19+
Env.HAIL_TMPDIR = 'gs://seqr-scratch-temp'
20+
21+
MIGRATIONS = [
22+
(
23+
DatasetType.MITO,
24+
ReferenceGenome.GRCh38,
25+
),
26+
#(
27+
# DatasetType.SNV_INDEL,
28+
# ReferenceGenome.GRCh37,
29+
#),
30+
#(
31+
# DatasetType.SNV_INDEL,
32+
# ReferenceGenome.GRCh38,
33+
#),
34+
]
35+
36+
def build_sample_id_to_family_guid():
37+
sample_id_to_family_guid = {}
38+
for dataset_type, reference_genome in MIGRATIONS:
39+
projects = []
40+
path = f'gs://seqr-hail-search-data/v03/{reference_genome.value}/{dataset_type.value}/projects/'
41+
r = subprocess.run(
42+
['gsutil', 'ls', path],
43+
capture_output=True,
44+
text=True,
45+
check=True,
46+
)
47+
if 'matched no objects' in r.stderr:
48+
continue
49+
for line in r.stdout.strip().split('\n'):
50+
if line.endswith('projects/'):
51+
continue
52+
projects.append(line)
53+
for project_table_path in projects:
54+
print(project_table_path)
55+
ht = hl.read_table(project_table_path)
56+
project_guid = project_table_path.replace('.ht/', '').split('/')[-1]
57+
if project_guid in sample_id_to_family_guid:
58+
sample_id_to_family_guid[project_guid] = {
59+
**{sample_id: family_guid for family_guid, samples in hl.eval(ht.globals.family_samples).items() for sample_id in samples},
60+
**sample_id_to_family_guid[project_guid]
61+
}
62+
else:
63+
sample_id_to_family_guid = {
64+
project_guid: {sample_id: family_guid for family_guid, samples in hl.eval(ht.globals.family_samples).items() for sample_id in samples},
65+
**sample_id_to_family_guid
66+
}
67+
with open('sample_id_to_family_guid.json', 'w') as f:
68+
json.dump(sample_id_to_family_guid, f, ensure_ascii=False, indent=4)
69+
70+
return sample_id_to_family_guid
71+
72+
def initialize_table(dataset_type: DatasetType, reference_genome: ReferenceGenome) -> hl.Table:
73+
key_type = dataset_type.table_key_type(reference_genome)
74+
return hl.Table.parallelize(
75+
[],
76+
hl.tstruct(
77+
**key_type,
78+
project_stats=hl.tarray(
79+
hl.tarray(
80+
hl.tstruct(
81+
**{
82+
field: hl.tint32
83+
for field in dataset_type.lookup_table_fields_and_genotype_filter_fns
84+
},
85+
),
86+
),
87+
),
88+
),
89+
key=key_type.fields,
90+
globals=hl.Struct(
91+
project_guids=hl.empty_array(hl.tstr),
92+
project_families=hl.empty_dict(hl.tstr, hl.tarray(hl.tstr)),
93+
updates=hl.empty_set(hl.tstruct(callset=hl.tstr, project_guid=hl.tstr)),
94+
),
95+
)
96+
97+
98+
sample_id_to_family_guid = build_sample_id_to_family_guid()
99+
for dataset_type, reference_genome in MIGRATIONS:
100+
ht = initialize_table(dataset_type, reference_genome)
101+
sample_lookup_ht = hl.read_table(f'gs://seqr-hail-search-data/v03/{reference_genome.value}/{dataset_type.value}/lookup.ht')
102+
for project_guid in sample_lookup_ht.ref_samples[:3]:
103+
if project_guid in PROJECTS_EXCLUDED_FROM_LOOKUP:
104+
continue
105+
if project_guid not in sample_id_to_family_guid:
106+
print('Skipping', project_guid)
107+
continue
108+
print(project_guid)
109+
if dataset_type == DatasetType.MITO:
110+
project_lookup_ht = sample_lookup_ht.select(
111+
ref_samples=hl.array(sample_lookup_ht.ref_samples[project_guid]).map(lambda sample_id: hl.dict(sample_id_to_family_guid[project_guid])[sample_id]),
112+
heteroplasmic_samples=hl.array(sample_lookup_ht.heteroplasmic_samples[project_guid]).map(lambda sample_id: hl.dict(sample_id_to_family_guid[project_guid])[sample_id]),
113+
homoplasmic_samples=hl.array(sample_lookup_ht.homoplasmic_samples[project_guid]).map(lambda sample_id: hl.dict(sample_id_to_family_guid[project_guid])[sample_id]),
114+
family_guids=hl.sorted(list(sample_id_to_family_guid[project_guid].keys())),
115+
)
116+
project_lookup_ht = project_lookup_ht.filter(hl.len(project_lookup_ht.family_guids) > 0)
117+
project_lookup_ht = project_lookup_ht.select(
118+
project_stats = project_lookup_ht.family_guids.map(
119+
lambda family_guid: hl.Struct(
120+
ref_samples=hl.len(project_lookup_ht.ref_samples.filter(lambda f: f == family_guid)),
121+
heteroplasmic_samples=hl.len(project_lookup_ht.heteroplasmic_samples.filter(lambda f: f == family_guid)),
122+
homoplasmic_samples=hl.len(project_lookup_ht.homoplasmic_samples.filter(lambda f: f == family_guid)),
123+
),
124+
),
125+
family_guids=project_lookup_ht.family_guids,
126+
)
127+
project_lookup_ht = project_lookup_ht.annotate(
128+
project_stats = [project_lookup_ht.project_stats.map(
129+
lambda x: hl.or_missing(
130+
((x.ref_samples > 0) | (x.heteroplasmic_samples > 0) | (x.homoplasmic_samples > 0)),
131+
x
132+
),
133+
)]
134+
)
135+
else:
136+
project_lookup_ht = sample_lookup_ht.select(
137+
ref_samples=hl.array(sample_lookup_ht.ref_samples[project_guid]).map(lambda sample_id: hl.dict(sample_id_to_family_guid[project_guid])[sample_id]),
138+
het_samples=hl.array(sample_lookup_ht.het_samples[project_guid]).map(lambda sample_id: hl.dict(sample_id_to_family_guid[project_guid])[sample_id]),
139+
hom_samples=hl.array(sample_lookup_ht.hom_samples[project_guid]).map(lambda sample_id: hl.dict(sample_id_to_family_guid[project_guid])[sample_id]),
140+
family_guids=hl.sorted(list(sample_id_to_family_guid[project_guid].keys())),
141+
)
142+
project_lookup_ht = project_lookup_ht.filter(
143+
hl.len(project_lookup_ht.family_guids) > 0
144+
)
145+
project_lookup_ht = project_lookup_ht.select(
146+
project_stats=project_lookup_ht.family_guids.map(
147+
lambda family_guid: hl.Struct(
148+
ref_samples=hl.len(project_lookup_ht.ref_samples.filter(lambda f: f == family_guid)),
149+
het_samples=hl.len(project_lookup_ht.het_samples.filter(lambda f: f == family_guid)),
150+
hom_samples=hl.len(project_lookup_ht.hom_samples.filter(lambda f: f == family_guid)),
151+
)
152+
),
153+
family_guids=project_lookup_ht.family_guids,
154+
)
155+
project_lookup_ht = project_lookup_ht.annotate(
156+
project_stats = [project_lookup_ht.project_stats.map(
157+
lambda x: hl.or_missing(
158+
((x.ref_samples > 0) | (x.het_samples > 0) | (x.hom_samples > 0)),
159+
x
160+
),
161+
)]
162+
)
163+
family_guids = project_lookup_ht.family_guids.take(1)
164+
project_lookup_ht = project_lookup_ht.select_globals(
165+
project_guids=[project_guid],
166+
project_families={project_guid: family_guids[0]},
167+
)
168+
project_lookup_ht = project_lookup_ht.select('project_stats')
169+
ht = join_lookup_hts(ht, project_lookup_ht)
170+
print(ht.count())
171+
print(ht.distinct().count())
172+
ht = ht.annotate_globals(
173+
updates=sample_lookup_ht.index_globals().updates
174+
)
175+
ht.write('gs://seqr-scratch-temp/mito_new_lookup.ht')
176+
177+

0 commit comments

Comments
 (0)