Skip to content

Commit 5686e5e

Browse files
committed
refactor to ensure we do all the tasks before writing a success file
1 parent 49da02e commit 5686e5e

File tree

3 files changed

+24
-20
lines changed

3 files changed

+24
-20
lines changed

v03_pipeline/bin/pipeline_worker.py

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,6 @@
1313
project_pedigree_path,
1414
project_remap_path,
1515
)
16-
from v03_pipeline.lib.tasks import (
17-
UpdateVariantAnnotationsTableWithNewSamplesTask,
18-
)
1916
from v03_pipeline.lib.tasks.trigger_hail_backend_reload import TriggerHailBackendReload
2017
from v03_pipeline.lib.tasks.write_success_file import WriteSuccessFileTask
2118

@@ -57,16 +54,14 @@ def main():
5754
'run_id': run_id,
5855
**{k: v for k, v in lpr.model_dump().items() if k != 'projects_to_run'},
5956
}
60-
tasks = [
61-
UpdateVariantAnnotationsTableWithNewSamplesTask(
62-
**loading_run_task_params,
63-
),
64-
WriteSuccessFileTask(**loading_run_task_params),
65-
]
6657
if Env.SHOULD_TRIGGER_HAIL_BACKEND_RELOAD:
67-
tasks.append(
58+
tasks = [
6859
TriggerHailBackendReload(**loading_run_task_params),
69-
)
60+
]
61+
else:
62+
tasks = [
63+
WriteSuccessFileTask(**loading_run_task_params),
64+
]
7065
luigi.build(tasks)
7166
except Exception:
7267
logger.exception('Unhandled Exception')

v03_pipeline/lib/tasks/trigger_hail_backend_reload.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@
44

55
from v03_pipeline.lib.logger import get_logger
66
from v03_pipeline.lib.model import Env
7-
from v03_pipeline.lib.tasks import UpdateVariantAnnotationsTableWithNewSamplesTask
87
from v03_pipeline.lib.tasks.base.base_project_info_params import (
98
BaseLoadingRunWithProjectInfoParams,
109
)
10+
from v03_pipeline.lib.tasks.write_success_file import WriteSuccessFileTask
1111

1212
logger = get_logger(__name__)
1313

@@ -19,7 +19,7 @@ def __init__(self, *args, **kwargs):
1919
self.done = False
2020

2121
def requires(self):
22-
return self.clone(UpdateVariantAnnotationsTableWithNewSamplesTask)
22+
return self.clone(WriteSuccessFileTask)
2323

2424
def run(self):
2525
url = f'http://{Env.HAIL_BACKEND_SERVICE_HOSTNAME}:{Env.HAIL_BACKEND_SERVICE_PORT}/reload_globals'

v03_pipeline/lib/tasks/write_success_file.py

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77
BaseLoadingRunWithProjectInfoParams,
88
)
99
from v03_pipeline.lib.tasks.files import GCSorLocalTarget
10+
from v03_pipeline.lib.tasks.update_variant_annotations_table_with_new_samples import (
11+
UpdateVariantAnnotationsTableWithNewSamplesTask,
12+
)
1013

1114

1215
@luigi.util.inherits(BaseLoadingRunWithProjectInfoParams)
@@ -21,14 +24,20 @@ def output(self) -> luigi.Target:
2124
)
2225

2326
def requires(self):
27+
requirements = [
28+
self.clone(UpdateVariantAnnotationsTableWithNewSamplesTask),
29+
]
2430
return [
25-
self.clone(
26-
WriteProjectFamilyTablesTask,
27-
project_guid=self.project_guids[i],
28-
project_remap_path=self.project_remap_paths[i],
29-
project_pedigree_path=self.project_pedigree_paths[i],
30-
)
31-
for i in range(len(self.project_guids))
31+
*requirements,
32+
*[
33+
self.clone(
34+
WriteProjectFamilyTablesTask,
35+
project_guid=self.project_guids[i],
36+
project_remap_path=self.project_remap_paths[i],
37+
project_pedigree_path=self.project_pedigree_paths[i],
38+
)
39+
for i in range(len(self.project_guids))
40+
],
3241
]
3342

3443
def run(self):

0 commit comments

Comments
 (0)