Skip to content

Commit e91dd66

Browse files
jklugherzbpblanken
andauthored
add task to reload hail backend globals (#902)
* make run id shared * add task to reload hail backend globals * ruff * simpler * fix run_id" * A fix * another test * A few more * Few more * Unnecessary * ruf * run id * string * unused * missed one * Fix it correctly * missed one * Last one! * add dependency and base params * add new param... * comments --------- Co-authored-by: Benjamin Blankenmeister <bblanken@broadinstitute.org>
1 parent 04376e8 commit e91dd66

9 files changed

+172
-27
lines changed

v03_pipeline/bin/pipeline_worker.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
from v03_pipeline.api.model import LoadingPipelineRequest
99
from v03_pipeline.lib.logger import get_logger
10+
from v03_pipeline.lib.model import Env
1011
from v03_pipeline.lib.paths import (
1112
loading_pipeline_queue_path,
1213
project_pedigree_path,
@@ -16,6 +17,7 @@
1617
UpdateVariantAnnotationsTableWithNewSamplesTask,
1718
WriteProjectFamilyTablesTask,
1819
)
20+
from v03_pipeline.lib.tasks.trigger_hail_backend_reload import TriggerHailBackendReload
1921

2022
logger = get_logger(__name__)
2123

@@ -48,14 +50,15 @@ def main():
4850
task_kwargs = {
4951
k: v for k, v in lpr.model_dump().items() if k != 'projects_to_run'
5052
}
53+
run_id = datetime.datetime.now(datetime.timezone.utc).strftime(
54+
'%Y%m%d-%H%M%S',
55+
)
5156
tasks = [
5257
UpdateVariantAnnotationsTableWithNewSamplesTask(
5358
project_guids=lpr.projects_to_run,
5459
project_remap_paths=project_remap_paths,
5560
project_pedigree_paths=project_pedigree_paths,
56-
run_id=datetime.datetime.now(datetime.timezone.utc).strftime(
57-
'%Y%m%d-%H%M%S',
58-
),
61+
run_id=run_id,
5962
**task_kwargs,
6063
),
6164
*[
@@ -68,6 +71,16 @@ def main():
6871
for i in range(len(lpr.projects_to_run))
6972
],
7073
]
74+
if Env.SHOULD_TRIGGER_HAIL_BACKEND_RELOAD:
75+
tasks.append(
76+
TriggerHailBackendReload(
77+
project_guids=lpr.projects_to_run,
78+
project_remap_paths=project_remap_paths,
79+
project_pedigree_paths=project_pedigree_paths,
80+
run_id=run_id,
81+
**task_kwargs,
82+
),
83+
)
7184
luigi.build(tasks)
7285
except Exception:
7386
logger.exception('Unhandled Exception')

v03_pipeline/lib/model/environment.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
'VEP_REFERENCE_DATASETS_DIR',
2929
'/seqr/vep-reference-data',
3030
)
31+
HAIL_BACKEND_SERVICE_HOSTNAME = os.environ.get('HAIL_BACKEND_SERVICE_HOSTNAME')
32+
HAIL_BACKEND_SERVICE_PORT = int(os.environ.get('HAIL_BACKEND_SERVICE_PORT', '5000'))
3133

3234
# Allele registry secrets :/
3335
ALLELE_REGISTRY_SECRET_NAME = os.environ.get('ALLELE_REGISTRY_SECRET_NAME', None)
@@ -40,6 +42,9 @@
4042
CHECK_SEX_AND_RELATEDNESS = os.environ.get('CHECK_SEX_AND_RELATEDNESS') == '1'
4143
EXPECT_WES_FILTERS = os.environ.get('EXPECT_WES_FILTERS') == '1'
4244
SHOULD_REGISTER_ALLELES = os.environ.get('SHOULD_REGISTER_ALLELES') == '1'
45+
SHOULD_TRIGGER_HAIL_BACKEND_RELOAD = (
46+
os.environ.get('SHOULD_TRIGGER_HAIL_BACKEND_RELOAD') == '1'
47+
)
4348

4449

4550
@dataclass
@@ -48,6 +53,8 @@ class Env:
4853
ALLELE_REGISTRY_SECRET_NAME: str | None = ALLELE_REGISTRY_SECRET_NAME
4954
CHECK_SEX_AND_RELATEDNESS: bool = CHECK_SEX_AND_RELATEDNESS
5055
EXPECT_WES_FILTERS: bool = EXPECT_WES_FILTERS
56+
HAIL_BACKEND_SERVICE_HOSTNAME: str | None = HAIL_BACKEND_SERVICE_HOSTNAME
57+
HAIL_BACKEND_SERVICE_PORT: int = HAIL_BACKEND_SERVICE_PORT
5158
HAIL_TMP_DIR: str = HAIL_TMP_DIR
5259
HAIL_SEARCH_DATA_DIR: str = HAIL_SEARCH_DATA_DIR
5360
GRCH37_TO_GRCH38_LIFTOVER_REF_PATH: str = GRCH37_TO_GRCH38_LIFTOVER_REF_PATH
@@ -57,4 +64,5 @@ class Env:
5764
PROJECT_ID: str | None = PROJECT_ID
5865
REFERENCE_DATASETS_DIR: str = REFERENCE_DATASETS_DIR
5966
SHOULD_REGISTER_ALLELES: bool = SHOULD_REGISTER_ALLELES
67+
SHOULD_TRIGGER_HAIL_BACKEND_RELOAD: bool = SHOULD_TRIGGER_HAIL_BACKEND_RELOAD
6068
VEP_REFERENCE_DATASETS_DIR: str = VEP_REFERENCE_DATASETS_DIR
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
import luigi
2+
import luigi.util
3+
4+
from v03_pipeline.lib.tasks.base.base_loading_run_params import BaseLoadingRunParams
5+
6+
7+
@luigi.util.inherits(BaseLoadingRunParams)
8+
class BaseLoadingRunWithProjectInfoParams(luigi.Task):
9+
project_guids = luigi.ListParameter()
10+
project_remap_paths = luigi.ListParameter()
11+
project_pedigree_paths = luigi.ListParameter()
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import luigi
2+
import luigi.util
3+
import requests
4+
5+
from v03_pipeline.lib.logger import get_logger
6+
from v03_pipeline.lib.model import Env
7+
from v03_pipeline.lib.tasks import UpdateVariantAnnotationsTableWithNewSamplesTask
8+
from v03_pipeline.lib.tasks.base.base_project_info_params import (
9+
BaseLoadingRunWithProjectInfoParams,
10+
)
11+
12+
logger = get_logger(__name__)
13+
14+
15+
@luigi.util.inherits(BaseLoadingRunWithProjectInfoParams)
16+
class TriggerHailBackendReload(luigi.Task):
17+
def __init__(self, *args, **kwargs):
18+
super().__init__(*args, **kwargs)
19+
self.done = False
20+
21+
def requires(self):
22+
return self.clone(UpdateVariantAnnotationsTableWithNewSamplesTask)
23+
24+
def run(self):
25+
url = f'{Env.HAIL_BACKEND_SERVICE_HOSTNAME}:{Env.HAIL_BACKEND_SERVICE_PORT}/reload_globals'
26+
res = requests.post(url, headers={'From': 'pipeline-runner'}, timeout=300)
27+
res.raise_for_status()
28+
self.done = True
29+
30+
def complete(self):
31+
return self.done
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
from unittest.mock import Mock, patch
2+
3+
import hail as hl
4+
import luigi.worker
5+
import requests
6+
7+
from v03_pipeline.lib.misc.io import remap_pedigree_hash
8+
from v03_pipeline.lib.model import DatasetType, ReferenceGenome, SampleType
9+
from v03_pipeline.lib.paths import variant_annotations_table_path
10+
from v03_pipeline.lib.tasks.trigger_hail_backend_reload import TriggerHailBackendReload
11+
from v03_pipeline.lib.test.mocked_dataroot_testcase import MockedDatarootTestCase
12+
13+
TEST_VCF = 'v03_pipeline/var/test/callsets/1kg_30variants.vcf'
14+
TEST_REMAP = 'v03_pipeline/var/test/remaps/test_remap_1.tsv'
15+
TEST_PEDIGREE_3 = 'v03_pipeline/var/test/pedigrees/test_pedigree_3.tsv'
16+
17+
18+
class TriggerHailBackendReloadTestCase(MockedDatarootTestCase):
19+
def setUp(self) -> None:
20+
super().setUp()
21+
annotations_ht = hl.Table.parallelize(
22+
[],
23+
hl.tstruct(
24+
locus=hl.tlocus(ReferenceGenome.GRCh38.value),
25+
alleles=hl.tarray(hl.tstr),
26+
),
27+
key=('locus', 'alleles'),
28+
globals=hl.Struct(
29+
paths=hl.Struct(),
30+
versions=hl.Struct(),
31+
enums=hl.Struct(),
32+
updates={
33+
hl.Struct(
34+
callset=TEST_VCF,
35+
project_guid='R0113_test_project',
36+
remap_pedigree_hash=hl.eval(
37+
remap_pedigree_hash(TEST_REMAP, TEST_PEDIGREE_3),
38+
),
39+
),
40+
},
41+
migrations=hl.empty_array(hl.tstr),
42+
),
43+
)
44+
annotations_ht.write(
45+
variant_annotations_table_path(
46+
ReferenceGenome.GRCh38,
47+
DatasetType.SNV_INDEL,
48+
),
49+
)
50+
51+
@patch.object(requests, 'post')
52+
def test_success(self, mock_post: Mock):
53+
mock_resp = requests.models.Response()
54+
mock_resp.status_code = 200
55+
mock_post.return_value = mock_resp
56+
57+
worker = luigi.worker.Worker()
58+
task = TriggerHailBackendReload(
59+
reference_genome=ReferenceGenome.GRCh38,
60+
dataset_type=DatasetType.SNV_INDEL,
61+
sample_type=SampleType.WES,
62+
callset_path=TEST_VCF,
63+
project_guids=['R0113_test_project'],
64+
run_id='manual__2024-09-20',
65+
project_remap_paths=[TEST_REMAP],
66+
project_pedigree_paths=[TEST_PEDIGREE_3],
67+
)
68+
worker.add(task)
69+
worker.run()
70+
self.assertTrue(task.complete())
71+
72+
@patch.object(requests, 'post')
73+
def test_failure(self, mock_post: Mock):
74+
mock_resp = requests.models.Response()
75+
mock_resp.status_code = 500
76+
mock_post.return_value = mock_resp
77+
78+
worker = luigi.worker.Worker()
79+
task = TriggerHailBackendReload(
80+
reference_genome=ReferenceGenome.GRCh38,
81+
dataset_type=DatasetType.SNV_INDEL,
82+
sample_type=SampleType.WES,
83+
callset_path=TEST_VCF,
84+
project_guids=['R0113_test_project'],
85+
run_id='manual__2024-09-20',
86+
project_remap_paths=[TEST_REMAP],
87+
project_pedigree_paths=[TEST_PEDIGREE_3],
88+
)
89+
worker.add(task)
90+
self.assertFalse(task.complete())

v03_pipeline/lib/tasks/update_lookup_table.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@
1010
)
1111
from v03_pipeline.lib.model.constants import PROJECTS_EXCLUDED_FROM_LOOKUP
1212
from v03_pipeline.lib.paths import remapped_and_subsetted_callset_path
13-
from v03_pipeline.lib.tasks.base.base_loading_run_params import BaseLoadingRunParams
13+
from v03_pipeline.lib.tasks.base.base_project_info_params import (
14+
BaseLoadingRunWithProjectInfoParams,
15+
)
1416
from v03_pipeline.lib.tasks.base.base_update_lookup_table import (
1517
BaseUpdateLookupTableTask,
1618
)
@@ -19,12 +21,8 @@
1921
)
2022

2123

22-
@luigi.util.inherits(BaseLoadingRunParams)
24+
@luigi.util.inherits(BaseLoadingRunWithProjectInfoParams)
2325
class UpdateLookupTableTask(BaseUpdateLookupTableTask):
24-
project_guids = luigi.ListParameter()
25-
project_remap_paths = luigi.ListParameter()
26-
project_pedigree_paths = luigi.ListParameter()
27-
2826
def complete(self) -> bool:
2927
return super().complete() and hl.eval(
3028
hl.bind(

v03_pipeline/lib/tasks/update_variant_annotations_table_with_new_samples.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,21 +9,19 @@
99
lookup_table_path,
1010
new_variants_table_path,
1111
)
12-
from v03_pipeline.lib.tasks.base.base_loading_run_params import BaseLoadingRunParams
12+
from v03_pipeline.lib.tasks.base.base_project_info_params import (
13+
BaseLoadingRunWithProjectInfoParams,
14+
)
1315
from v03_pipeline.lib.tasks.base.base_update_variant_annotations_table import (
1416
BaseUpdateVariantAnnotationsTableTask,
1517
)
1618
from v03_pipeline.lib.tasks.write_new_variants_table import WriteNewVariantsTableTask
1719

1820

19-
@luigi.util.inherits(BaseLoadingRunParams)
21+
@luigi.util.inherits(BaseLoadingRunWithProjectInfoParams)
2022
class UpdateVariantAnnotationsTableWithNewSamplesTask(
2123
BaseUpdateVariantAnnotationsTableTask,
2224
):
23-
project_guids = luigi.ListParameter()
24-
project_remap_paths = luigi.ListParameter()
25-
project_pedigree_paths = luigi.ListParameter()
26-
2725
def requires(self) -> list[luigi.Task]:
2826
return [
2927
*super().requires(),

v03_pipeline/lib/tasks/write_metadata_for_run.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,17 @@
55
import luigi.util
66

77
from v03_pipeline.lib.paths import metadata_for_run_path
8-
from v03_pipeline.lib.tasks.base.base_loading_run_params import BaseLoadingRunParams
8+
from v03_pipeline.lib.tasks.base.base_project_info_params import (
9+
BaseLoadingRunWithProjectInfoParams,
10+
)
911
from v03_pipeline.lib.tasks.files import GCSorLocalTarget
1012
from v03_pipeline.lib.tasks.write_remapped_and_subsetted_callset import (
1113
WriteRemappedAndSubsettedCallsetTask,
1214
)
1315

1416

15-
@luigi.util.inherits(BaseLoadingRunParams)
17+
@luigi.util.inherits(BaseLoadingRunWithProjectInfoParams)
1618
class WriteMetadataForRunTask(luigi.Task):
17-
project_guids = luigi.ListParameter()
18-
project_remap_paths = luigi.ListParameter()
19-
project_pedigree_paths = luigi.ListParameter()
20-
2119
def output(self) -> luigi.Target:
2220
return GCSorLocalTarget(
2321
metadata_for_run_path(

v03_pipeline/lib/tasks/write_new_variants_table.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@
2424
load_gencode_ensembl_to_refseq_id,
2525
load_gencode_gene_symbol_to_gene_id,
2626
)
27-
from v03_pipeline.lib.tasks.base.base_loading_run_params import BaseLoadingRunParams
27+
from v03_pipeline.lib.tasks.base.base_project_info_params import (
28+
BaseLoadingRunWithProjectInfoParams,
29+
)
2830
from v03_pipeline.lib.tasks.base.base_write import BaseWriteTask
2931
from v03_pipeline.lib.tasks.files import GCSorLocalTarget
3032
from v03_pipeline.lib.tasks.reference_data.update_variant_annotations_table_with_updated_reference_dataset import (
@@ -43,12 +45,8 @@
4345
GENCODE_FOR_VEP_RELEASE = 44
4446

4547

46-
@luigi.util.inherits(BaseLoadingRunParams)
48+
@luigi.util.inherits(BaseLoadingRunWithProjectInfoParams)
4749
class WriteNewVariantsTableTask(BaseWriteTask):
48-
project_guids = luigi.ListParameter()
49-
project_remap_paths = luigi.ListParameter()
50-
project_pedigree_paths = luigi.ListParameter()
51-
5250
@property
5351
def annotation_dependencies(self) -> dict[str, hl.Table]:
5452
deps = get_rdc_annotation_dependencies(self.dataset_type, self.reference_genome)

0 commit comments

Comments
 (0)