Skip to content

Commit faa824e

Browse files
Add Checkpointing and QC Collection to GlimpseImputation (#161)
* Also changed CRAM localization to samtools, since GLIMPSE2 cannot handle CRAMs and their indices in different directories --------- Co-authored-by: Christopher Kachulis <ckachuli@broadinstitute.org>
1 parent 9d76d93 commit faa824e

10 files changed

+736
-83
lines changed

.dockstore.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,9 @@ workflows:
8484
- name: Glimpse2ImputationInBatches
8585
subclass: WDL
8686
primaryDescriptorPath: /GlimpseImputationPipeline/Glimpse2ImputationInBatches.wdl
87+
- name: Glimpse2CheckQC
88+
subclass: WDL
89+
primaryDescriptorPath: /GlimpseImputationPipeline/Glimpse2CheckQC.wdl
8790
- name: RNAMetrics
8891
subclass: WDL
8992
primaryDescriptorPath: /Utilities/WDLs/RNAMetrics.wdl
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
version 1.0
2+
3+
workflow Glimpse2CheckQC {
4+
input {
5+
File qc_metrics
6+
File qc_metrics_thresholds
7+
String output_basename
8+
9+
Int preemptible = 3
10+
String docker = "us.gcr.io/broad-dsde-methods/python-data-slim:1.0"
11+
Int cpu = 1
12+
Int mem_gb = 4
13+
}
14+
15+
call Glimpse2CheckQCTask {
16+
input:
17+
qc_metrics = qc_metrics,
18+
qc_metrics_thresholds = qc_metrics_thresholds,
19+
output_basename = output_basename,
20+
preemptible = preemptible,
21+
docker = docker,
22+
cpu = cpu,
23+
mem_gb = mem_gb
24+
}
25+
26+
output {
27+
Boolean qc_passed = Glimpse2CheckQCTask.qc_passed
28+
File qc_failures = Glimpse2CheckQCTask.qc_failures
29+
}
30+
}
31+
32+
task Glimpse2CheckQCTask {
33+
input {
34+
File qc_metrics
35+
File qc_metrics_thresholds
36+
String output_basename
37+
38+
Int preemptible
39+
String docker
40+
Int cpu
41+
Int mem_gb
42+
}
43+
44+
Int disk_size_gb = 10
45+
46+
command <<<
47+
set -euo pipefail
48+
49+
cat <<'EOF' > script.py
50+
import pandas as pd
51+
52+
data = pd.read_csv('~{qc_metrics}', sep='\t')
53+
qc_metric_thresholds = pd.read_csv('~{qc_metrics_thresholds}', sep='\t')
54+
55+
data = data.melt(id_vars=['s'], var_name='metric', value_name='value', value_vars=qc_metric_thresholds.metric)
56+
data = data.merge(qc_metric_thresholds, on=['metric'])
57+
58+
samples_out_of_spec = data.loc[(data['value'] < data['min']) | (data['value'] > data['max'])].sort_values(['s', 'metric'])
59+
samples_out_of_spec.rename(columns={'s': 'sample_id'}).to_csv('~{output_basename}.qc_failures.tsv', sep='\t', index=False)
60+
61+
with open('~{output_basename}.qc_passed.txt', 'w') as qc_passed:
62+
qc_passed.write('true\n' if len(samples_out_of_spec) == 0 else 'false\n')
63+
EOF
64+
python3 script.py
65+
>>>
66+
67+
runtime {
68+
docker: docker
69+
disks: "local-disk " + disk_size_gb + " HDD"
70+
memory: mem_gb + " GiB"
71+
cpu: cpu
72+
preemptible: preemptible
73+
}
74+
75+
output {
76+
Boolean qc_passed = read_boolean("~{output_basename}.qc_passed.txt")
77+
File qc_failures = "~{output_basename}.qc_failures.tsv"
78+
}
79+
}

GlimpseImputationPipeline/Glimpse2Imputation.wdl

Lines changed: 225 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,48 @@ workflow Glimpse2Imputation {
2121
Int? n_burnin
2222
Int? n_main
2323
Int? effective_population_size
24+
25+
Boolean collect_qc_metrics = true
2426

25-
Int preemptible = 1
26-
String docker = "us.gcr.io/broad-dsde-methods/glimpse:palantir-workflows_20c9de0"
27-
Int cpu_phase = 4
28-
Int mem_gb_phase = 16
27+
Int preemptible = 9
28+
String docker = "us.gcr.io/broad-dsde-methods/glimpse:odelaneau_f310862"
29+
String docker_extract_num_sites_from_reference_chunk = "us.gcr.io/broad-dsde-methods/glimpse_extract_num_sites_from_reference_chunks:michaelgatzen_edc7f3a"
2930
Int cpu_ligate = 4
3031
Int mem_gb_ligate = 4
3132
File? monitoring_script
3233
}
3334

3435
scatter (reference_chunk in read_lines(reference_chunks)) {
36+
call GetNumberOfSitesInChunk {
37+
input:
38+
reference_chunk = reference_chunk,
39+
docker = docker_extract_num_sites_from_reference_chunk
40+
}
41+
42+
Int n_rare = GetNumberOfSitesInChunk.n_rare
43+
Int n_common = GetNumberOfSitesInChunk.n_common
44+
45+
if (defined(input_vcf)) {
46+
call CountSamples {
47+
input:
48+
vcf = select_first([input_vcf])
49+
}
50+
}
51+
52+
Int n_samples = select_first([CountSamples.nSamples, length(select_first([crams]))])
53+
54+
call SelectResourceParameters {
55+
input:
56+
n_rare = n_rare,
57+
n_common = n_common,
58+
n_samples = n_samples
59+
}
60+
61+
if (SelectResourceParameters.memory_gb > 256 || SelectResourceParameters.request_n_cpus > 32) {
62+
# force failure if we're accidently going to request too much resources and spend too much money
63+
Int safety_check_memory_gb = -1
64+
Int safety_check_n_cpu = -1
65+
}
3566
call GlimpsePhase {
3667
input:
3768
reference_chunk = reference_chunk,
@@ -49,8 +80,8 @@ workflow Glimpse2Imputation {
4980
fasta_index = fasta_index,
5081
preemptible = preemptible,
5182
docker = docker,
52-
cpu = cpu_phase,
53-
mem_gb = mem_gb_phase,
83+
cpu = select_first([safety_check_n_cpu, SelectResourceParameters.request_n_cpus]),
84+
mem_gb = select_first([safety_check_memory_gb, SelectResourceParameters.memory_gb]),
5485
monitoring_script = monitoring_script
5586
}
5687
}
@@ -68,9 +99,21 @@ workflow Glimpse2Imputation {
6899
monitoring_script = monitoring_script
69100
}
70101

102+
if (collect_qc_metrics) {
103+
call CollectQCMetrics {
104+
input:
105+
imputed_vcf = GlimpseLigate.imputed_vcf,
106+
output_basename = output_basename,
107+
monitoring_script = monitoring_script
108+
}
109+
}
110+
71111
output {
72112
File imputed_vcf = GlimpseLigate.imputed_vcf
73113
File imputed_vcf_index = GlimpseLigate.imputed_vcf_index
114+
115+
File? qc_metrics = CollectQCMetrics.qc_metrics
116+
74117
Array[File?] glimpse_phase_monitoring = GlimpsePhase.monitoring
75118
File? glimpse_ligate_monitoring = GlimpseLigate.monitoring
76119
}
@@ -95,8 +138,8 @@ task GlimpsePhase {
95138

96139
Int mem_gb = 4
97140
Int cpu = 4
98-
Int disk_size_gb = ceil(2.2 * size(input_vcf, "GiB") + size(reference_chunk, "GiB") + 100)
99-
Int preemptible = 1
141+
Int disk_size_gb = ceil(2.2 * size(input_vcf, "GiB") + size(reference_chunk, "GiB") + 0.003 * length(select_first([crams, []])) + 10)
142+
Int preemptible = 9
100143
Int max_retries = 3
101144
String docker
102145
File? monitoring_script
@@ -117,16 +160,27 @@ task GlimpsePhase {
117160

118161
export GCS_OAUTH_TOKEN=$(/root/google-cloud-sdk/bin/gcloud auth application-default print-access-token)
119162

163+
seq_cache_populate.pl -root ./ref/cache ~{fasta}
164+
export REF_PATH=:
165+
export REF_CACHE=./ref/cache/%2s/%2s/%s
166+
120167
~{"bash " + monitoring_script + " > monitoring.log &"}
121168

122169
cram_paths=( ~{sep=" " crams} )
170+
cram_index_paths=( ~{sep=" " cram_indices} )
123171
sample_ids=( ~{sep=" " sample_ids} )
124172

173+
chunk_region=$(echo "~{reference_chunk}"|sed 's/^.*chr/chr/'|sed 's/\.bin//'|sed 's/_/:/1'|sed 's/_/-/1')
174+
175+
echo "Region for CRAM extraction: ${chunk_region}"
125176
for i in "${!cram_paths[@]}" ; do
126-
echo -e "${cram_paths[$i]} ${sample_ids[$i]}" >> crams.list
177+
samtools view -h -C -X -T ~{fasta} -o cram${i}.cram "${cram_paths[$i]}" "${cram_index_paths[$i]}" ${chunk_region}
178+
samtools index cram${i}.cram
179+
echo -e "cram${i}.cram ${sample_ids[$i]}" >> crams.list
180+
echo "Processed CRAM ${i}: ${cram_paths[$i]} -> cram${i}.cram"
127181
done
128182

129-
/bin/GLIMPSE2_phase \
183+
cmd="/bin/GLIMPSE2_phase \
130184
~{"--input-gl " + input_vcf} \
131185
--reference ~{reference_chunk} \
132186
--output phase_output.bcf \
@@ -135,7 +189,14 @@ task GlimpsePhase {
135189
~{"--burnin " + n_burnin} ~{"--main " + n_main} \
136190
~{"--ne " + effective_population_size} \
137191
~{bam_file_list_input} \
138-
~{"--fasta " + fasta}
192+
~{"--fasta " + fasta} \
193+
--checkpoint-file-out checkpoint.bin"
194+
195+
if [ -s "checkpoint.bin" ]; then
196+
cmd="$cmd --checkpoint-file-in checkpoint.bin"
197+
fi
198+
199+
eval $cmd
139200
>>>
140201

141202
runtime {
@@ -145,6 +206,7 @@ task GlimpsePhase {
145206
cpu: cpu
146207
preemptible: preemptible
147208
maxRetries: max_retries
209+
checkpointFile: "checkpoint.bin"
148210
}
149211

150212
output {
@@ -202,3 +264,155 @@ task GlimpseLigate {
202264
File? monitoring = "monitoring.log"
203265
}
204266
}
267+
268+
task CollectQCMetrics {
269+
input {
270+
File imputed_vcf
271+
String output_basename
272+
273+
Int preemptible = 1
274+
String docker = "hailgenetics/hail:0.2.126-py3.11"
275+
Int cpu = 4
276+
Int mem_gb = 16
277+
File? monitoring_script
278+
}
279+
280+
parameter_meta {
281+
imputed_vcf: {
282+
localization_optional: true
283+
}
284+
}
285+
286+
Int disk_size_gb = 100
287+
288+
command <<<
289+
set -euo pipefail
290+
291+
~{"bash " + monitoring_script + " > monitoring.log &"}
292+
293+
cat <<'EOF' > script.py
294+
import hail as hl
295+
import pandas as pd
296+
297+
# Calculate metrics
298+
hl.init(default_reference='GRCh38', idempotent=True)
299+
vcf = hl.import_vcf('~{imputed_vcf}', force_bgz=True)
300+
qc = hl.sample_qc(vcf)
301+
qc.cols().flatten().rename({'sample_qc.' + col: col for col in list(qc['sample_qc'])}).export('~{output_basename}.qc_metrics.tsv')
302+
EOF
303+
python3 script.py
304+
>>>
305+
306+
runtime {
307+
docker: docker
308+
disks: "local-disk " + disk_size_gb + " HDD"
309+
memory: mem_gb + " GiB"
310+
cpu: cpu
311+
preemptible: preemptible
312+
}
313+
314+
output {
315+
File qc_metrics = "~{output_basename}.qc_metrics.tsv"
316+
File? monitoring = "monitoring.log"
317+
}
318+
}
319+
320+
task GetNumberOfSitesInChunk {
321+
input {
322+
File reference_chunk
323+
324+
String docker
325+
Int mem_gb = 4
326+
Int cpu = 4
327+
Int disk_size_gb = ceil(size(reference_chunk, "GiB") + 10)
328+
Int preemptible = 1
329+
Int max_retries = 3
330+
}
331+
332+
command <<<
333+
set -xeuo pipefail
334+
/bin/GLIMPSE2_extract_num_sites_from_reference_chunk ~{reference_chunk} > n_sites.txt
335+
cat n_sites.txt
336+
grep "Lrare" n_sites.txt | sed 's/Lrare=//' > n_rare.txt
337+
grep "Lcommon" n_sites.txt | sed 's/Lcommon=//' > n_common.txt
338+
>>>
339+
340+
runtime {
341+
docker: docker
342+
disks: "local-disk " + disk_size_gb + " HDD"
343+
memory: mem_gb + " GiB"
344+
cpu: cpu
345+
preemptible: preemptible
346+
maxRetries: max_retries
347+
}
348+
349+
output {
350+
Int n_rare = read_int("n_rare.txt")
351+
Int n_common = read_int("n_common.txt")
352+
}
353+
}
354+
355+
task CountSamples {
356+
input {
357+
File vcf
358+
359+
String bcftools_docker = "us.gcr.io/broad-gotc-prod/imputation-bcf-vcf:1.0.7-1.10.2-0.1.16-1669908889"
360+
Int cpu = 1
361+
Int memory_mb = 3000
362+
Int disk_size_gb = 10 + ceil(size(vcf, "GiB"))
363+
}
364+
365+
command <<<
366+
bcftools query -l ~{vcf} | wc -l
367+
>>>
368+
369+
runtime {
370+
docker: bcftools_docker
371+
disks: "local-disk ${disk_size_gb} HDD"
372+
memory: "${memory_mb} MiB"
373+
cpu: cpu
374+
}
375+
output {
376+
Int nSamples = read_int(stdout())
377+
}
378+
}
379+
380+
task SelectResourceParameters {
381+
input {
382+
Int n_rare
383+
Int n_common
384+
Int n_samples
385+
}
386+
387+
command <<<
388+
python3 << EOF
389+
import math
390+
n_rare = ~{n_rare}
391+
n_common = ~{n_common}
392+
n_samples = ~{n_samples}
393+
n_sites = n_common + n_rare
394+
395+
# try to keep expected runtime under 4 hours, but don't ask for more than 32 cpus, or 256 GB memory
396+
estimated_needed_threads = min(math.ceil(5e-6*n_sites*n_samples/240), 32)
397+
estimated_needed_memory_gb = min(math.ceil((800e-3 + 0.97e-6 * n_rare * estimated_needed_threads + 14.6e-6 * n_common * estimated_needed_threads + 6.5e-9 * (n_rare + n_common) * n_samples + 13.7e-3 * n_samples + 1.8e-6*(n_rare + n_common)*math.log(n_samples))), 256)
398+
# recalc allowable threads, may be some additional threads available due to rounding memory up
399+
threads_to_use = max(math.floor((estimated_needed_memory_gb - (800e-3 + 6.5e-9 * (n_rare + n_common) * n_samples + 13.7e-3 * n_samples + 1.8e-6*(n_rare + n_common)*math.log(n_samples)))/(0.97e-6 * n_rare + 14.6e-6 * n_common)), 1)
400+
#estimated_needed_memory_gb = math.ceil(1.2 * estimated_needed_memory_gb)
401+
402+
with open("n_cpus_request.txt", "w") as f_cpus_request:
403+
f_cpus_request.write(f'{int(threads_to_use)}')
404+
405+
with open("memory_gb.txt", "w") as f_mem:
406+
f_mem.write(f'{int(estimated_needed_memory_gb)}')
407+
EOF
408+
>>>
409+
410+
runtime {
411+
docker : "us.gcr.io/broad-dsde-methods/python-data-slim:1.0"
412+
}
413+
414+
output {
415+
Int memory_gb = read_int("memory_gb.txt")
416+
Int request_n_cpus = read_int("n_cpus_request.txt")
417+
}
418+
}

0 commit comments

Comments
 (0)