Skip to content

Commit 5dcf568

Browse files
committed
Delete old runs
1 parent cdc5a8b commit 5dcf568

File tree

4 files changed

+186
-5
lines changed

4 files changed

+186
-5
lines changed

v03_pipeline/lib/paths.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,12 +79,10 @@ def metadata_for_run_path(
7979
run_id: str,
8080
) -> str:
8181
return os.path.join(
82-
_v03_pipeline_prefix(
83-
Env.HAIL_SEARCH_DATA,
82+
runs_path(
8483
reference_genome,
8584
dataset_type,
8685
),
87-
'runs',
8886
run_id,
8987
'metadata.json',
9088
)
@@ -140,6 +138,20 @@ def remapped_and_subsetted_callset_path(
140138
)
141139

142140

141+
def runs_path(
142+
reference_genome: ReferenceGenome,
143+
dataset_type: DatasetType,
144+
) -> str:
145+
return os.path.join(
146+
_v03_pipeline_prefix(
147+
Env.HAIL_SEARCH_DATA,
148+
reference_genome,
149+
dataset_type,
150+
),
151+
'runs',
152+
)
153+
154+
143155
def sample_lookup_table_path(
144156
reference_genome: ReferenceGenome,
145157
dataset_type: DatasetType,
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
import os
2+
import shutil
3+
4+
import hail as hl
5+
6+
from v03_pipeline.lib.paths import runs_path
7+
from v03_pipeline.lib.tasks.base.base_hail_table_task import BaseHailTableTask
8+
from v03_pipeline.lib.tasks.files import GCSorLocalTarget
9+
10+
MIN_SUCCESSFUL_RUNS = 10
11+
12+
13+
class DeleteOldRunsTask(BaseHailTableTask):
14+
def __init__(self, *args, **kwargs):
15+
super().__init__(*args, **kwargs)
16+
self._complete = False
17+
18+
def complete(self) -> bool:
19+
return self._complete
20+
21+
def run(self) -> None:
22+
run_dir_paths = sorted(
23+
[
24+
rd['path']
25+
for rd in hl.hadoop_ls(
26+
runs_path(
27+
self.reference_genome,
28+
self.dataset_type,
29+
),
30+
)
31+
if rd['is_dir']
32+
]
33+
)
34+
successful_run_dir_paths = [
35+
run_dir_path
36+
for run_dir_path in run_dir_paths
37+
if hl.hadoop_exists(
38+
os.path.join(
39+
run_dir_path,
40+
'_SUCCESS',
41+
),
42+
)
43+
]
44+
if len(successful_run_dir_paths) < MIN_SUCCESSFUL_RUNS:
45+
self._complete = True
46+
return
47+
48+
# Delete run dirs until we encounter the first of the N successful runs to keep.
49+
oldest_successful_run_index = run_dir_paths.index(successful_run_dir_paths[-MIN_SUCCESSFUL_RUNS])
50+
for run_dir_path in run_dir_paths[:oldest_successful_run_index]:
51+
# NB: our existing GCSorLocalTarget does not work here because
52+
# the LocalTarget API does not support removing directories.
53+
if run_dir_path.startswith('gs://'):
54+
gcs.GCSTarget(pathname).remove(recursive=True)
55+
else:
56+
shutil.rmtree(run_dir_path)
57+
self._complete = True
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
import os
2+
from pathlib import Path
3+
4+
import hail as hl
5+
import luigi.worker
6+
7+
from v03_pipeline.lib.model import DatasetType, ReferenceGenome, SampleType
8+
from v03_pipeline.lib.paths import runs_path
9+
from v03_pipeline.lib.tasks.delete_old_runs import DeleteOldRunsTask
10+
from v03_pipeline.lib.test.mocked_dataroot_testcase import MockedDatarootTestCase
11+
12+
13+
class DeleteOldRunsTaskTest(MockedDatarootTestCase):
14+
def test_too_few_successful_runs(self) -> None:
15+
for run_dir in [
16+
'manual__2024-01-05',
17+
'manual__2024-01-06',
18+
'manual__2024-01-07',
19+
'manual__2024-01-08',
20+
'manual__2024-01-09',
21+
'manual__2024-01-10',
22+
'manual__2024-01-11',
23+
]:
24+
run_dir_path = os.path.join(
25+
runs_path(
26+
ReferenceGenome.GRCh38,
27+
DatasetType.SNV_INDEL,
28+
),
29+
run_dir,
30+
)
31+
Path(run_dir_path).mkdir(parents=True, exist_ok=True)
32+
Path((run_dir_path) / Path('_SUCCESS')).touch()
33+
worker = luigi.worker.Worker()
34+
write_metadata_for_run_task = DeleteOldRunsTask(
35+
reference_genome=ReferenceGenome.GRCh38,
36+
dataset_type=DatasetType.SNV_INDEL,
37+
sample_type=SampleType.WGS,
38+
)
39+
worker.add(write_metadata_for_run_task)
40+
worker.run()
41+
self.assertTrue(write_metadata_for_run_task.complete())
42+
self.assertEqual(
43+
len(
44+
hl.hadoop_ls(
45+
runs_path(
46+
ReferenceGenome.GRCh38,
47+
DatasetType.SNV_INDEL,
48+
)
49+
)
50+
),
51+
7,
52+
)
53+
54+
55+
def test_leave_incomplete_runs(self) -> None:
56+
for run_dir in [
57+
'manual__2024-01-05',
58+
'manual__2024-01-06',
59+
'manual__2024-01-07',
60+
'manual__2024-01-08',
61+
'manual__2024-01-09',
62+
'manual__2024-01-10',
63+
'manual__2024-01-11',
64+
'manual__2024-01-12',
65+
'manual__2024-01-13',
66+
'manual__2024-01-14',
67+
'manual__2024-01-15',
68+
'manual__2024-01-16',
69+
'manual__2024-01-17',
70+
]:
71+
run_dir_path = os.path.join(
72+
runs_path(
73+
ReferenceGenome.GRCh38,
74+
DatasetType.SNV_INDEL,
75+
),
76+
run_dir,
77+
)
78+
Path(run_dir_path).mkdir(parents=True, exist_ok=True)
79+
80+
# Force a couple of incomplete runs
81+
if run_dir not in set(['manual__2024-01-13', 'manual__2024-01-16']):
82+
Path((run_dir_path) / Path('_SUCCESS')).touch()
83+
84+
worker = luigi.worker.Worker()
85+
write_metadata_for_run_task = DeleteOldRunsTask(
86+
reference_genome=ReferenceGenome.GRCh38,
87+
dataset_type=DatasetType.SNV_INDEL,
88+
sample_type=SampleType.WGS,
89+
)
90+
worker.add(write_metadata_for_run_task)
91+
worker.run()
92+
self.assertTrue(write_metadata_for_run_task.complete())
93+
self.assertEqual(
94+
len(
95+
hl.hadoop_ls(
96+
runs_path(
97+
ReferenceGenome.GRCh38,
98+
DatasetType.SNV_INDEL,
99+
)
100+
)
101+
),
102+
12,
103+
)
104+
self.assertFalse(
105+
hl.hadoop_exists(
106+
runs_path(
107+
ReferenceGenome.GRCh38,
108+
DatasetType.SNV_INDEL,
109+
),
110+
'manual__2024-01-05'
111+
)
112+
)

v03_pipeline/lib/tasks/write_metadata_for_run.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,14 @@
55

66
from v03_pipeline.lib.misc.util import callset_project_pairs
77
from v03_pipeline.lib.paths import metadata_for_run_path
8-
from v03_pipeline.lib.tasks.base.base_write_task import BaseWriteTask
8+
from v03_pipeline.lib.tasks.base.base_hail_table_task import BaseHailTableTask
99
from v03_pipeline.lib.tasks.files import GCSorLocalTarget
1010
from v03_pipeline.lib.tasks.write_remapped_and_subsetted_callset import (
1111
WriteRemappedAndSubsettedCallsetTask,
1212
)
1313

1414

15-
class WriteMetadataForRunTask(BaseWriteTask):
15+
class WriteMetadataForRunTask(BaseHailTableTask):
1616
callset_paths = luigi.ListParameter()
1717
project_guids = luigi.ListParameter()
1818
project_remap_paths = luigi.ListParameter()

0 commit comments

Comments
 (0)