Skip to content

Commit db64b2d

Browse files
authored
feat: Add rsync task between hail search data/reference data stored in gcs and on local disk. (#1006)
* Support gcs dirs in rsync * ws * Add create dataproc cluster task * add dataproc * ruff * requirements * still struggling * Gencode refactor to remove gcs * bump reqs * Run dataproc job * lib * running * merge requirements * Flip'em * Better exception handling * Cleaner approach if less generalizable * write a test * Fix tests * lint * Add test for success * refactor to use a base class... better for adding support for multiple jobs * cleanup * ruff * Fix missing mock * Fix flapping test * first commit * Finish test and cleanup * Allow any order
1 parent a6ef086 commit db64b2d

File tree

5 files changed

+237
-40
lines changed

5 files changed

+237
-40
lines changed

v03_pipeline/lib/model/environment.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@
5050
GCLOUD_ZONE = os.environ.get('GCLOUD_ZONE')
5151
GCLOUD_REGION = os.environ.get('GCLOUD_REGION')
5252
PIPELINE_RUNNER_APP_VERSION = os.environ.get('PIPELINE_RUNNER_APP_VERSION', 'latest')
53+
SEQR_APP_HAIL_SEARCH_DATA_DIR = os.environ.get('SEQR_APP_HAIL_SEARCH_DATA_DIR')
54+
SEQR_APP_REFERENCE_DATASETS_DIR = os.environ.get('SEQR_APP_REFERENCE_DATASETS_DIR')
5355

5456

5557
@dataclass
@@ -71,4 +73,6 @@ class Env:
7173
PIPELINE_RUNNER_APP_VERSION: str = PIPELINE_RUNNER_APP_VERSION
7274
PRIVATE_REFERENCE_DATASETS_DIR: str = PRIVATE_REFERENCE_DATASETS_DIR
7375
REFERENCE_DATASETS_DIR: str = REFERENCE_DATASETS_DIR
76+
SEQR_APP_HAIL_SEARCH_DATA_DIR: str | None = SEQR_APP_HAIL_SEARCH_DATA_DIR
77+
SEQR_APP_REFERENCE_DATASETS_DIR: str | None = SEQR_APP_REFERENCE_DATASETS_DIR
7478
VEP_REFERENCE_DATASETS_DIR: str = VEP_REFERENCE_DATASETS_DIR

v03_pipeline/lib/paths.py

Lines changed: 22 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
)
1818

1919

20-
def _pipeline_prefix(
20+
def pipeline_prefix(
2121
root: str,
2222
reference_genome: ReferenceGenome,
2323
dataset_type: DatasetType,
@@ -36,38 +36,15 @@ def _pipeline_prefix(
3636
)
3737

3838

39-
def _v03_reference_data_prefix(
40-
access_control: AccessControl,
41-
reference_genome: ReferenceGenome,
42-
dataset_type: DatasetType,
43-
) -> str:
44-
root = (
45-
Env.PRIVATE_REFERENCE_DATASETS_DIR
46-
if access_control == AccessControl.PRIVATE
47-
else Env.REFERENCE_DATASETS_DIR
48-
)
49-
if FeatureFlag.INCLUDE_PIPELINE_VERSION_IN_PREFIX:
50-
return os.path.join(
51-
root,
52-
PipelineVersion.V03.value,
53-
reference_genome.value,
54-
dataset_type.value,
55-
)
56-
return os.path.join(
57-
root,
58-
reference_genome.value,
59-
dataset_type.value,
60-
)
61-
62-
6339
def _v03_reference_dataset_prefix(
40+
root: str,
6441
access_control: AccessControl,
6542
reference_genome: ReferenceGenome,
6643
) -> str:
6744
root = (
6845
Env.PRIVATE_REFERENCE_DATASETS_DIR
6946
if access_control == AccessControl.PRIVATE
70-
else Env.REFERENCE_DATASETS_DIR
47+
else root
7148
)
7249
if FeatureFlag.INCLUDE_PIPELINE_VERSION_IN_PREFIX:
7350
return os.path.join(
@@ -88,7 +65,7 @@ def family_table_path(
8865
family_guid: str,
8966
) -> str:
9067
return os.path.join(
91-
_pipeline_prefix(
68+
pipeline_prefix(
9269
Env.HAIL_SEARCH_DATA_DIR,
9370
reference_genome,
9471
dataset_type,
@@ -104,7 +81,7 @@ def tdr_metrics_dir(
10481
dataset_type: DatasetType,
10582
) -> str:
10683
return os.path.join(
107-
_pipeline_prefix(
84+
pipeline_prefix(
10885
Env.LOADING_DATASETS_DIR,
10986
reference_genome,
11087
dataset_type,
@@ -130,7 +107,7 @@ def imported_callset_path(
130107
callset_path: str,
131108
) -> str:
132109
return os.path.join(
133-
_pipeline_prefix(
110+
pipeline_prefix(
134111
Env.LOADING_DATASETS_DIR,
135112
reference_genome,
136113
dataset_type,
@@ -177,7 +154,7 @@ def project_table_path(
177154
project_guid: str,
178155
) -> str:
179156
return os.path.join(
180-
_pipeline_prefix(
157+
pipeline_prefix(
181158
Env.HAIL_SEARCH_DATA_DIR,
182159
reference_genome,
183160
dataset_type,
@@ -194,7 +171,7 @@ def relatedness_check_table_path(
194171
callset_path: str,
195172
) -> str:
196173
return os.path.join(
197-
_pipeline_prefix(
174+
pipeline_prefix(
198175
Env.LOADING_DATASETS_DIR,
199176
reference_genome,
200177
dataset_type,
@@ -210,7 +187,7 @@ def relatedness_check_tsv_path(
210187
callset_path: str,
211188
) -> str:
212189
return os.path.join(
213-
_pipeline_prefix(
190+
pipeline_prefix(
214191
Env.LOADING_DATASETS_DIR,
215192
reference_genome,
216193
dataset_type,
@@ -227,7 +204,7 @@ def remapped_and_subsetted_callset_path(
227204
project_guid: str,
228205
) -> str:
229206
return os.path.join(
230-
_pipeline_prefix(
207+
pipeline_prefix(
231208
Env.LOADING_DATASETS_DIR,
232209
reference_genome,
233210
dataset_type,
@@ -243,7 +220,7 @@ def lookup_table_path(
243220
dataset_type: DatasetType,
244221
) -> str:
245222
return os.path.join(
246-
_pipeline_prefix(
223+
pipeline_prefix(
247224
Env.HAIL_SEARCH_DATA_DIR,
248225
reference_genome,
249226
dataset_type,
@@ -257,7 +234,7 @@ def runs_path(
257234
dataset_type: DatasetType,
258235
) -> str:
259236
return os.path.join(
260-
_pipeline_prefix(
237+
pipeline_prefix(
261238
Env.HAIL_SEARCH_DATA_DIR,
262239
reference_genome,
263240
dataset_type,
@@ -272,7 +249,7 @@ def sex_check_table_path(
272249
callset_path: str,
273250
) -> str:
274251
return os.path.join(
275-
_pipeline_prefix(
252+
pipeline_prefix(
276253
Env.LOADING_DATASETS_DIR,
277254
reference_genome,
278255
dataset_type,
@@ -306,6 +283,7 @@ def valid_reference_dataset_path(
306283
) -> str | None:
307284
return os.path.join(
308285
_v03_reference_dataset_prefix(
286+
Env.REFERENCE_DATASETS_DIR,
309287
reference_dataset.access_control,
310288
reference_genome,
311289
),
@@ -318,9 +296,13 @@ def valid_reference_dataset_query_path(
318296
reference_genome: ReferenceGenome,
319297
dataset_type: DatasetType,
320298
reference_dataset_query: ReferenceDatasetQuery,
299+
root=None,
321300
) -> str | None:
301+
if not root:
302+
root = Env.REFERENCE_DATASETS_DIR
322303
return os.path.join(
323304
_v03_reference_dataset_prefix(
305+
root,
324306
reference_dataset_query.access_control,
325307
reference_genome,
326308
),
@@ -334,7 +316,7 @@ def variant_annotations_table_path(
334316
dataset_type: DatasetType,
335317
) -> str:
336318
return os.path.join(
337-
_pipeline_prefix(
319+
pipeline_prefix(
338320
Env.HAIL_SEARCH_DATA_DIR,
339321
reference_genome,
340322
dataset_type,
@@ -348,7 +330,7 @@ def variant_annotations_vcf_path(
348330
dataset_type: DatasetType,
349331
) -> str:
350332
return os.path.join(
351-
_pipeline_prefix(
333+
pipeline_prefix(
352334
Env.HAIL_SEARCH_DATA_DIR,
353335
reference_genome,
354336
dataset_type,
@@ -386,7 +368,7 @@ def project_remap_path(
386368
project_guid: str,
387369
) -> str:
388370
return os.path.join(
389-
_pipeline_prefix(
371+
pipeline_prefix(
390372
Env.LOADING_DATASETS_DIR,
391373
reference_genome,
392374
dataset_type,
@@ -404,7 +386,7 @@ def project_pedigree_path(
404386
project_guid: str,
405387
) -> str:
406388
return os.path.join(
407-
_pipeline_prefix(
389+
pipeline_prefix(
408390
Env.LOADING_DATASETS_DIR,
409391
reference_genome,
410392
dataset_type,

v03_pipeline/lib/reference_datasets/reference_dataset.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,21 @@ class ReferenceDatasetQuery(BaseReferenceDataset, str, Enum):
166166
clinvar_path_variants = 'clinvar_path_variants'
167167
high_af_variants = 'high_af_variants'
168168

169+
@classmethod
170+
def for_reference_genome_dataset_type(
171+
cls,
172+
reference_genome: ReferenceGenome,
173+
dataset_type: DatasetType,
174+
) -> set['ReferenceDatasetQuery']:
175+
return {
176+
dataset
177+
for dataset in super().for_reference_genome_dataset_type(
178+
reference_genome,
179+
dataset_type,
180+
)
181+
if isinstance(dataset, cls)
182+
}
183+
169184
@property
170185
def requires(self) -> ReferenceDataset:
171186
return {
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
import os
2+
import subprocess
3+
4+
import luigi
5+
6+
from v03_pipeline.lib.model import Env
7+
from v03_pipeline.lib.paths import pipeline_prefix, valid_reference_dataset_query_path
8+
from v03_pipeline.lib.reference_datasets.reference_dataset import ReferenceDatasetQuery
9+
from v03_pipeline.lib.tasks.base.base_loading_run_params import (
10+
BaseLoadingRunParams,
11+
)
12+
13+
14+
def hail_search_value(value: str) -> str:
15+
return value.replace('SV', 'SV_WGS').replace(
16+
'GCNV',
17+
'SV_WES',
18+
)
19+
20+
21+
def rsync_command(src_path: str, dst_path: str) -> list[str]:
22+
return [
23+
'/bin/bash',
24+
'-cx',
25+
f'mkdir -p {dst_path} && gsutil -qm rsync -rd -x .*runs.* {src_path} {dst_path} && sync {dst_path}',
26+
]
27+
28+
29+
@luigi.util.inherits(BaseLoadingRunParams)
30+
class RsyncToSeqrAppDirsTask(luigi.Task):
31+
def __init__(self, *args, **kwargs):
32+
super().__init__(*args, **kwargs)
33+
self.done = False
34+
35+
def output(self) -> None:
36+
return None
37+
38+
def complete(self) -> bool:
39+
return self.done
40+
41+
def run(self) -> None:
42+
if not (
43+
Env.SEQR_APP_HAIL_SEARCH_DATA_DIR and Env.SEQR_APP_REFERENCE_DATASETS_DIR
44+
):
45+
self.done = True
46+
return
47+
48+
if not (
49+
Env.HAIL_SEARCH_DATA_DIR.startswith('gs://')
50+
and Env.REFERENCE_DATASETS_DIR.startswith('gs://')
51+
):
52+
msg = 'Overridden HAIL_SEARCH_DATA_DIR and REFERENCE_DATASETS_DIR must be Google Cloud buckets.'
53+
raise RuntimeError(msg)
54+
55+
# Sync Pipeline Tables
56+
src_path = pipeline_prefix(
57+
Env.HAIL_SEARCH_DATA_DIR,
58+
self.reference_genome,
59+
self.dataset_type,
60+
)
61+
dst_path = hail_search_value(
62+
pipeline_prefix(
63+
Env.SEQR_APP_HAIL_SEARCH_DATA_DIR,
64+
self.reference_genome,
65+
self.dataset_type,
66+
),
67+
)
68+
subprocess.call(
69+
rsync_command(src_path, dst_path), # noqa: S603
70+
)
71+
72+
# Sync RDQs
73+
for query in ReferenceDatasetQuery.for_reference_genome_dataset_type(
74+
self.reference_genome,
75+
self.dataset_type,
76+
):
77+
src_path = valid_reference_dataset_query_path(
78+
self.reference_genome,
79+
self.dataset_type,
80+
query,
81+
)
82+
dst_path = os.path.join(
83+
hail_search_value(
84+
valid_reference_dataset_query_path(
85+
self.reference_genome,
86+
self.dataset_type,
87+
query,
88+
Env.SEQR_APP_REFERENCE_DATASETS_DIR,
89+
),
90+
),
91+
)
92+
subprocess.call(
93+
rsync_command(src_path, dst_path), # noqa: S603
94+
)
95+
self.done = True

0 commit comments

Comments
 (0)