Skip to content

Commit b64f3e9

Browse files
authored
Merge pull request #701 from broadinstitute/benb/delete_old_runs
Delete old pipeline runs.
2 parents 62f1979 + ea6044e commit b64f3e9

File tree

5 files changed

+185
-5
lines changed

5 files changed

+185
-5
lines changed

v03_pipeline/lib/paths.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,12 +79,10 @@ def metadata_for_run_path(
7979
run_id: str,
8080
) -> str:
8181
return os.path.join(
82-
_v03_pipeline_prefix(
83-
Env.HAIL_SEARCH_DATA,
82+
runs_path(
8483
reference_genome,
8584
dataset_type,
8685
),
87-
'runs',
8886
run_id,
8987
'metadata.json',
9088
)
@@ -140,6 +138,20 @@ def remapped_and_subsetted_callset_path(
140138
)
141139

142140

141+
def runs_path(
142+
reference_genome: ReferenceGenome,
143+
dataset_type: DatasetType,
144+
) -> str:
145+
return os.path.join(
146+
_v03_pipeline_prefix(
147+
Env.HAIL_SEARCH_DATA,
148+
reference_genome,
149+
dataset_type,
150+
),
151+
'runs',
152+
)
153+
154+
143155
def sample_lookup_table_path(
144156
reference_genome: ReferenceGenome,
145157
dataset_type: DatasetType,

v03_pipeline/lib/tasks/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from v03_pipeline.lib.tasks.delete_old_runs import DeleteOldRunsTask
12
from v03_pipeline.lib.tasks.update_project_table import UpdateProjectTableTask
23
from v03_pipeline.lib.tasks.update_sample_lookup_table import (
34
UpdateSampleLookupTableTask,
@@ -11,6 +12,7 @@
1112
)
1213

1314
__all__ = [
15+
'DeleteOldRunsTask',
1416
'UpdateProjectTableTask',
1517
'UpdateSampleLookupTableTask',
1618
'UpdateVariantAnnotationsTableWithNewSamplesTask',
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import os
2+
3+
import hail as hl
4+
5+
from v03_pipeline.lib.paths import runs_path
6+
from v03_pipeline.lib.tasks.base.base_hail_table_task import BaseHailTableTask
7+
from v03_pipeline.lib.tasks.files import GCSorLocalTarget
8+
9+
MIN_SUCCESSFUL_RUNS = 10
10+
11+
12+
class DeleteOldRunsTask(BaseHailTableTask):
13+
def __init__(self, *args, **kwargs):
14+
super().__init__(*args, **kwargs)
15+
self._complete = False
16+
17+
def complete(self) -> bool:
18+
return self._complete
19+
20+
def run(self) -> None:
21+
run_dir_paths = sorted(
22+
[
23+
rd['path']
24+
for rd in hl.hadoop_ls(
25+
runs_path(
26+
self.reference_genome,
27+
self.dataset_type,
28+
),
29+
)
30+
if rd['is_dir']
31+
],
32+
)
33+
successful_run_dir_paths = [
34+
run_dir_path
35+
for run_dir_path in run_dir_paths
36+
if hl.hadoop_exists(
37+
os.path.join(
38+
run_dir_path,
39+
'_SUCCESS',
40+
),
41+
)
42+
]
43+
if len(successful_run_dir_paths) < MIN_SUCCESSFUL_RUNS:
44+
self._complete = True
45+
return
46+
47+
# Delete run dirs until we encounter the first of the N successful runs to keep.
48+
oldest_successful_run_index = run_dir_paths.index(
49+
successful_run_dir_paths[-MIN_SUCCESSFUL_RUNS],
50+
)
51+
for run_dir_path in run_dir_paths[:oldest_successful_run_index]:
52+
GCSorLocalTarget(run_dir_path.replace('file:', '')).remove()
53+
self._complete = True
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
import os
2+
from pathlib import Path
3+
4+
import hail as hl
5+
import luigi.worker
6+
7+
from v03_pipeline.lib.model import DatasetType, ReferenceGenome, SampleType
8+
from v03_pipeline.lib.paths import runs_path
9+
from v03_pipeline.lib.tasks.delete_old_runs import DeleteOldRunsTask
10+
from v03_pipeline.lib.test.mocked_dataroot_testcase import MockedDatarootTestCase
11+
12+
13+
class DeleteOldRunsTaskTest(MockedDatarootTestCase):
14+
def test_too_few_successful_runs(self) -> None:
15+
for run_dir in [
16+
'manual__2024-01-05',
17+
'manual__2024-01-06',
18+
'manual__2024-01-07',
19+
'manual__2024-01-08',
20+
'manual__2024-01-09',
21+
'manual__2024-01-10',
22+
'manual__2024-01-11',
23+
]:
24+
run_dir_path = os.path.join(
25+
runs_path(
26+
ReferenceGenome.GRCh38,
27+
DatasetType.SNV_INDEL,
28+
),
29+
run_dir,
30+
)
31+
Path(run_dir_path).mkdir(parents=True, exist_ok=True)
32+
Path((run_dir_path) / Path('_SUCCESS')).touch()
33+
worker = luigi.worker.Worker()
34+
write_metadata_for_run_task = DeleteOldRunsTask(
35+
reference_genome=ReferenceGenome.GRCh38,
36+
dataset_type=DatasetType.SNV_INDEL,
37+
sample_type=SampleType.WGS,
38+
)
39+
worker.add(write_metadata_for_run_task)
40+
worker.run()
41+
self.assertTrue(write_metadata_for_run_task.complete())
42+
self.assertEqual(
43+
len(
44+
hl.hadoop_ls(
45+
runs_path(
46+
ReferenceGenome.GRCh38,
47+
DatasetType.SNV_INDEL,
48+
),
49+
),
50+
),
51+
7,
52+
)
53+
54+
def test_leave_incomplete_runs(self) -> None:
55+
for run_dir in [
56+
'manual__2024-01-05',
57+
'manual__2024-01-06',
58+
'manual__2024-01-07',
59+
'manual__2024-01-08',
60+
'manual__2024-01-09',
61+
'manual__2024-01-10',
62+
'manual__2024-01-11',
63+
'manual__2024-01-12',
64+
'manual__2024-01-13',
65+
'manual__2024-01-14',
66+
'manual__2024-01-15',
67+
'manual__2024-01-16',
68+
'manual__2024-01-17',
69+
]:
70+
run_dir_path = os.path.join(
71+
runs_path(
72+
ReferenceGenome.GRCh38,
73+
DatasetType.SNV_INDEL,
74+
),
75+
run_dir,
76+
)
77+
Path(run_dir_path).mkdir(parents=True, exist_ok=True)
78+
79+
# Force a couple of incomplete runs
80+
if run_dir not in {'manual__2024-01-13', 'manual__2024-01-16'}:
81+
Path((run_dir_path) / Path('_SUCCESS')).touch()
82+
83+
worker = luigi.worker.Worker()
84+
write_metadata_for_run_task = DeleteOldRunsTask(
85+
reference_genome=ReferenceGenome.GRCh38,
86+
dataset_type=DatasetType.SNV_INDEL,
87+
sample_type=SampleType.WGS,
88+
)
89+
worker.add(write_metadata_for_run_task)
90+
worker.run()
91+
self.assertTrue(write_metadata_for_run_task.complete())
92+
self.assertEqual(
93+
len(
94+
hl.hadoop_ls(
95+
runs_path(
96+
ReferenceGenome.GRCh38,
97+
DatasetType.SNV_INDEL,
98+
),
99+
),
100+
),
101+
12,
102+
)
103+
self.assertFalse(
104+
hl.hadoop_exists(
105+
os.path.join(
106+
runs_path(
107+
ReferenceGenome.GRCh38,
108+
DatasetType.SNV_INDEL,
109+
),
110+
'manual__2024-01-05',
111+
),
112+
),
113+
)

v03_pipeline/lib/tasks/write_metadata_for_run.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,14 @@
55

66
from v03_pipeline.lib.misc.util import callset_project_pairs
77
from v03_pipeline.lib.paths import metadata_for_run_path
8-
from v03_pipeline.lib.tasks.base.base_write_task import BaseWriteTask
8+
from v03_pipeline.lib.tasks.base.base_hail_table_task import BaseHailTableTask
99
from v03_pipeline.lib.tasks.files import GCSorLocalTarget
1010
from v03_pipeline.lib.tasks.write_remapped_and_subsetted_callset import (
1111
WriteRemappedAndSubsettedCallsetTask,
1212
)
1313

1414

15-
class WriteMetadataForRunTask(BaseWriteTask):
15+
class WriteMetadataForRunTask(BaseHailTableTask):
1616
callset_paths = luigi.ListParameter()
1717
project_guids = luigi.ListParameter()
1818
project_remap_paths = luigi.ListParameter()

0 commit comments

Comments
 (0)