Skip to content

Commit 73d1669

Browse files
authored
add task to write success file for run (#920)
* add task to write success file for run * remove WriteProjectFamilyTablesTask from pipeline_worker * no hfs * add to init
1 parent e15e33e commit 73d1669

File tree

5 files changed

+97
-25
lines changed

5 files changed

+97
-25
lines changed

v03_pipeline/bin/pipeline_worker.py

Lines changed: 11 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@
1515
)
1616
from v03_pipeline.lib.tasks import (
1717
UpdateVariantAnnotationsTableWithNewSamplesTask,
18-
WriteProjectFamilyTablesTask,
1918
)
2019
from v03_pipeline.lib.tasks.trigger_hail_backend_reload import TriggerHailBackendReload
20+
from v03_pipeline.lib.tasks.write_success_file import WriteSuccessFileTask
2121

2222
logger = get_logger(__name__)
2323

@@ -47,39 +47,25 @@ def main():
4747
)
4848
for project_guid in lpr.projects_to_run
4949
]
50-
task_kwargs = {
51-
k: v for k, v in lpr.model_dump().items() if k != 'projects_to_run'
52-
}
5350
run_id = datetime.datetime.now(datetime.timezone.utc).strftime(
5451
'%Y%m%d-%H%M%S',
5552
)
53+
loading_run_task_params = {
54+
'project_guids': lpr.projects_to_run,
55+
'project_remap_paths': project_remap_paths,
56+
'project_pedigree_paths': project_pedigree_paths,
57+
'run_id': run_id,
58+
**{k: v for k, v in lpr.model_dump().items() if k != 'projects_to_run'},
59+
}
5660
tasks = [
5761
UpdateVariantAnnotationsTableWithNewSamplesTask(
58-
project_guids=lpr.projects_to_run,
59-
project_remap_paths=project_remap_paths,
60-
project_pedigree_paths=project_pedigree_paths,
61-
run_id=run_id,
62-
**task_kwargs,
62+
**loading_run_task_params,
6363
),
64-
*[
65-
WriteProjectFamilyTablesTask(
66-
project_guid=lpr.projects_to_run[i],
67-
project_remap_path=project_remap_paths[i],
68-
project_pedigree_path=project_pedigree_paths[i],
69-
**task_kwargs,
70-
)
71-
for i in range(len(lpr.projects_to_run))
72-
],
64+
WriteSuccessFileTask(**loading_run_task_params),
7365
]
7466
if Env.SHOULD_TRIGGER_HAIL_BACKEND_RELOAD:
7567
tasks.append(
76-
TriggerHailBackendReload(
77-
project_guids=lpr.projects_to_run,
78-
project_remap_paths=project_remap_paths,
79-
project_pedigree_paths=project_pedigree_paths,
80-
run_id=run_id,
81-
**task_kwargs,
82-
),
68+
TriggerHailBackendReload(**loading_run_task_params),
8369
)
8470
luigi.build(tasks)
8571
except Exception:

v03_pipeline/lib/paths.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -366,3 +366,18 @@ def loading_pipeline_queue_path() -> str:
366366
'loading_pipeline_queue',
367367
'request.json',
368368
)
369+
370+
371+
def pipeline_run_success_file_path(
372+
reference_genome: ReferenceGenome,
373+
dataset_type: DatasetType,
374+
run_id: str,
375+
) -> str:
376+
return os.path.join(
377+
runs_path(
378+
reference_genome,
379+
dataset_type,
380+
),
381+
run_id,
382+
'_SUCCESS',
383+
)

v03_pipeline/lib/tasks/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
from v03_pipeline.lib.tasks.write_project_family_tables import (
3535
WriteProjectFamilyTablesTask,
3636
)
37+
from v03_pipeline.lib.tasks.write_success_file import WriteSuccessFileTask
3738

3839
__all__ = [
3940
'DeleteFamilyTableTask',
@@ -52,4 +53,5 @@
5253
'UpdateVariantAnnotationsTableWithDeletedFamiliesTask',
5354
'WriteMetadataForRunTask',
5455
'WriteProjectFamilyTablesTask',
56+
'WriteSuccessFileTask',
5557
]
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import luigi
2+
import luigi.util
3+
4+
from v03_pipeline.lib.paths import pipeline_run_success_file_path
5+
from v03_pipeline.lib.tasks import WriteProjectFamilyTablesTask
6+
from v03_pipeline.lib.tasks.base.base_project_info_params import (
7+
BaseLoadingRunWithProjectInfoParams,
8+
)
9+
from v03_pipeline.lib.tasks.files import GCSorLocalTarget
10+
11+
12+
@luigi.util.inherits(BaseLoadingRunWithProjectInfoParams)
13+
class WriteSuccessFileTask(luigi.Task):
14+
def output(self) -> luigi.Target:
15+
return GCSorLocalTarget(
16+
pipeline_run_success_file_path(
17+
self.reference_genome,
18+
self.dataset_type,
19+
self.run_id,
20+
),
21+
)
22+
23+
def requires(self):
24+
return [
25+
self.clone(
26+
WriteProjectFamilyTablesTask,
27+
project_guid=self.project_guids[i],
28+
project_remap_path=self.project_remap_paths[i],
29+
project_pedigree_path=self.project_pedigree_paths[i],
30+
)
31+
for i in range(len(self.project_guids))
32+
]
33+
34+
def run(self):
35+
with self.output().open('w') as f:
36+
f.write('')
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
from unittest import mock
2+
3+
import luigi.worker
4+
5+
from v03_pipeline.lib.model import DatasetType, ReferenceGenome, SampleType
6+
from v03_pipeline.lib.tasks.write_success_file import WriteSuccessFileTask
7+
from v03_pipeline.lib.test.mock_complete_task import MockCompleteTask
8+
from v03_pipeline.lib.test.mocked_dataroot_testcase import MockedDatarootTestCase
9+
10+
11+
class WriteSuccessFileTaskTest(MockedDatarootTestCase):
12+
@mock.patch(
13+
'v03_pipeline.lib.tasks.write_success_file.WriteProjectFamilyTablesTask',
14+
)
15+
def test_write_success_file_task(self, mock_write_project_fam_tables) -> None:
16+
mock_write_project_fam_tables.return_value = MockCompleteTask()
17+
18+
worker = luigi.worker.Worker()
19+
write_success_file = WriteSuccessFileTask(
20+
reference_genome=ReferenceGenome.GRCh38,
21+
dataset_type=DatasetType.SNV_INDEL,
22+
sample_type=SampleType.WGS,
23+
callset_path='test_callset',
24+
project_guids=['R0113_test_project'],
25+
project_remap_paths=['test_remap'],
26+
project_pedigree_paths=['test_pedigree'],
27+
run_id='manual__2024-04-03',
28+
)
29+
worker.add(write_success_file)
30+
worker.run()
31+
self.assertTrue(write_success_file.complete())
32+
with open(write_success_file.output().path) as f:
33+
self.assertEqual(f.read(), '')

0 commit comments

Comments
 (0)