Skip to content

Commit 8602958

Browse files
authored
Local Pipeline api and worker (#876)
* First commit * ruff * tasks * remap/pedigree paths * add paths * add model test * Finish bad requests tests * ruff * a pattern for a queue * cleanup * Still hackin * In progress * almost there? * lint * lint worker * update worker * Remove force from local pipeline * well * cleanup * finish pipeline worker * ruff * ruff * liftover * update worker * ruff * add crdqs * remove force
1 parent 598e01b commit 8602958

14 files changed

+409
-13
lines changed

requirements.in

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,5 @@ luigi>=3.4.0
55
gnomad==0.6.4
66
google-cloud-storage>=2.14.0
77
google-cloud-secret-manager>=2.20.0
8+
aiofiles==24.1.0
9+
pydantic==2.8.2

v03_pipeline/api/app.py

Lines changed: 66 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,80 @@
1-
from aiohttp import web
1+
import json
2+
import os
3+
import traceback
24

3-
from v03_pipeline.lib.tasks import * # noqa: F403
5+
import aiofiles
6+
import aiofiles.os
7+
from aiohttp import web, web_exceptions
8+
9+
from v03_pipeline.api.model import LoadingPipelineRequest
10+
from v03_pipeline.lib.logger import get_logger
11+
from v03_pipeline.lib.paths import loading_pipeline_queue_path
12+
13+
logger = get_logger(__name__)
14+
15+
16+
@web.middleware
17+
async def error_middleware(request, handler):
18+
try:
19+
return await handler(request)
20+
except web.HTTPError:
21+
logger.exception('HTTPError')
22+
raise
23+
except Exception as e:
24+
logger.exception('Unhandled Exception')
25+
error_reason = f'{e}: {traceback.format_exc()}'
26+
raise web.HTTPInternalServerError(reason=error_reason) from e
27+
28+
29+
async def loading_pipeline_enqueue(request: web.Request) -> web.Response:
30+
if not request.body_exists:
31+
raise web.HTTPUnprocessableEntity
32+
33+
try:
34+
lpr = LoadingPipelineRequest.model_validate(await request.json())
35+
except ValueError as e:
36+
raise web.HTTPBadRequest from e
37+
38+
try:
39+
async with aiofiles.open(loading_pipeline_queue_path(), 'r') as f:
40+
return web.json_response(
41+
{
42+
'Failed to queue due to in process request': json.loads(
43+
await f.read(),
44+
),
45+
},
46+
#
47+
# The 409 (Conflict) status code indicates that the request
48+
# could not be completed due to a conflict with the current
49+
# state of the target resource.
50+
#
51+
status=web_exceptions.HTTPConflict.status_code,
52+
)
53+
except FileNotFoundError:
54+
pass
55+
56+
async with aiofiles.open(loading_pipeline_queue_path(), 'w') as f:
57+
await f.write(lpr.model_dump_json())
58+
return web.json_response(
59+
{'Successfully queued': lpr.model_dump()},
60+
status=web_exceptions.HTTPAccepted.status_code,
61+
)
462

563

664
async def status(_: web.Request) -> web.Response:
765
return web.json_response({'success': True})
866

967

1068
async def init_web_app():
11-
app = web.Application()
69+
await aiofiles.os.makedirs(
70+
os.path.dirname(loading_pipeline_queue_path()),
71+
exist_ok=True,
72+
)
73+
app = web.Application(middlewares=[error_middleware])
1274
app.add_routes(
1375
[
1476
web.get('/status', status),
77+
web.post('/loading_pipeline_enqueue', loading_pipeline_enqueue),
1578
],
1679
)
1780
return app

v03_pipeline/api/app_test.py

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
from aiohttp import web_exceptions
2+
from aiohttp.test_utils import AioHTTPTestCase
3+
4+
from v03_pipeline.api.app import init_web_app
5+
from v03_pipeline.lib.model import DatasetType, ReferenceGenome, SampleType
6+
from v03_pipeline.lib.test.mocked_dataroot_testcase import MockedDatarootTestCase
7+
8+
CALLSET_PATH = 'v03_pipeline/var/test/callsets/1kg_30variants.vcf'
9+
10+
11+
class AppTest(AioHTTPTestCase, MockedDatarootTestCase):
12+
async def get_application(self):
13+
return await init_web_app()
14+
15+
async def test_status(self):
16+
async with self.client.request('GET', '/status') as resp:
17+
self.assertEqual(resp.status, 200)
18+
resp_json = await resp.json()
19+
self.assertDictEqual(resp_json, {'success': True})
20+
21+
async def test_missing_route(self):
22+
with self.assertLogs(level='ERROR') as _:
23+
async with self.client.request('GET', '/loading_pip') as resp:
24+
self.assertEqual(resp.status, web_exceptions.HTTPNotFound.status_code)
25+
26+
async def test_loading_pipeline_invalid_requests(self):
27+
with self.assertLogs(level='ERROR') as log:
28+
async with self.client.request('GET', '/loading_pipeline_enqueue') as resp:
29+
self.assertEqual(
30+
resp.status,
31+
web_exceptions.HTTPMethodNotAllowed.status_code,
32+
)
33+
self.assertTrue(
34+
'HTTPMethodNotAllowed' in log.output[0],
35+
)
36+
37+
with self.assertLogs(level='ERROR') as log:
38+
async with self.client.request('POST', '/loading_pipeline_enqueue') as resp:
39+
self.assertEqual(
40+
resp.status,
41+
web_exceptions.HTTPUnprocessableEntity.status_code,
42+
)
43+
self.assertTrue(
44+
'HTTPUnprocessableEntity' in log.output[0],
45+
)
46+
47+
body = {
48+
'callset_path': 'missing.vcf',
49+
'projects_to_run': ['project_a'],
50+
'sample_type': SampleType.WGS.value,
51+
'reference_genome': ReferenceGenome.GRCh38.value,
52+
'dataset_type': DatasetType.SNV_INDEL.value,
53+
}
54+
with self.assertLogs(level='ERROR') as log:
55+
async with self.client.request(
56+
'POST',
57+
'/loading_pipeline_enqueue',
58+
json=body,
59+
) as resp:
60+
self.assertEqual(
61+
resp.status,
62+
web_exceptions.HTTPBadRequest.status_code,
63+
)
64+
self.assertTrue(
65+
'callset_path must point to a file that exists' in log.output[0],
66+
)
67+
68+
async def test_loading_pipeline_enqueue(self):
69+
body = {
70+
'callset_path': CALLSET_PATH,
71+
'projects_to_run': ['project_a'],
72+
'sample_type': SampleType.WGS.value,
73+
'reference_genome': ReferenceGenome.GRCh38.value,
74+
'dataset_type': DatasetType.SNV_INDEL.value,
75+
}
76+
async with self.client.request(
77+
'POST',
78+
'/loading_pipeline_enqueue',
79+
json=body,
80+
) as resp:
81+
self.assertEqual(
82+
resp.status,
83+
web_exceptions.HTTPAccepted.status_code,
84+
)
85+
resp_json = await resp.json()
86+
self.assertDictEqual(
87+
resp_json,
88+
{
89+
'Successfully queued': {
90+
'callset_path': 'v03_pipeline/var/test/callsets/1kg_30variants.vcf',
91+
'dataset_type': 'SNV_INDEL',
92+
'ignore_missing_samples_when_remapping': False,
93+
'projects_to_run': ['project_a'],
94+
'reference_genome': 'GRCh38',
95+
'sample_type': 'WGS',
96+
'skip_validation': False,
97+
},
98+
},
99+
)
100+
101+
# Second request
102+
body['projects_to_run'] = ['project_b', 'project_c']
103+
async with self.client.request(
104+
'POST',
105+
'/loading_pipeline_enqueue',
106+
json=body,
107+
) as resp:
108+
self.assertEqual(
109+
resp.status,
110+
web_exceptions.HTTPConflict.status_code,
111+
)

v03_pipeline/api/model.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import hailtop.fs as hfs
2+
from pydantic import BaseModel, Field, field_validator
3+
4+
from v03_pipeline.lib.model import DatasetType, ReferenceGenome, SampleType
5+
6+
VALID_FILE_TYPES = ['vcf', 'vcf.gz', 'vcf.bgz', 'mt']
7+
8+
9+
class LoadingPipelineRequest(BaseModel):
10+
callset_path: str
11+
projects_to_run: list[str] = Field(min_length=1, frozen=True)
12+
sample_type: SampleType
13+
reference_genome: ReferenceGenome
14+
dataset_type: DatasetType
15+
ignore_missing_samples_when_remapping: bool = False
16+
skip_validation: bool = False
17+
18+
@field_validator('callset_path')
19+
@classmethod
20+
def check_valid_callset_path(cls, callset_path: str) -> str:
21+
if not any(callset_path.endswith(file_type) for file_type in VALID_FILE_TYPES):
22+
msg = 'callset_path must be a VCF or a Hail Matrix Table'
23+
raise ValueError(msg)
24+
if not hfs.exists(callset_path):
25+
msg = 'callset_path must point to a file that exists'
26+
raise ValueError(msg)
27+
return callset_path

v03_pipeline/api/model_test.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import unittest
2+
3+
from v03_pipeline.api.model import LoadingPipelineRequest
4+
from v03_pipeline.lib.model import DatasetType, ReferenceGenome, SampleType
5+
6+
TEST_VCF = 'v03_pipeline/var/test/callsets/1kg_30variants.vcf'
7+
8+
9+
class ModelTest(unittest.TestCase):
10+
def test_valid_loading_pipeline_requests(self) -> None:
11+
raw_request = {
12+
'callset_path': TEST_VCF,
13+
'projects_to_run': ['project_a'],
14+
'sample_type': SampleType.WGS.value,
15+
'reference_genome': ReferenceGenome.GRCh38.value,
16+
'dataset_type': DatasetType.SNV_INDEL.value,
17+
}
18+
lpr = LoadingPipelineRequest.model_validate(raw_request)
19+
self.assertEqual(lpr.reference_genome, ReferenceGenome.GRCh38)
20+
21+
def test_invalid_loading_pipeline_requests(self) -> None:
22+
raw_request = {
23+
'callset_path': 'a.txt',
24+
'projects_to_run': [],
25+
'sample_type': 'BLENDED',
26+
'reference_genome': ReferenceGenome.GRCh38.value,
27+
'dataset_type': DatasetType.SNV_INDEL.value,
28+
}
29+
with self.assertRaises(ValueError) as cm:
30+
LoadingPipelineRequest.model_validate(raw_request)
31+
self.assertTrue(
32+
str(cm.exception).startswith(
33+
'3 validation errors for LoadingPipelineRequest',
34+
),
35+
)

v03_pipeline/bin/pipeline_worker.py

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
#!/usr/bin/env python3
2+
import datetime
3+
import os
4+
import time
5+
6+
import luigi
7+
8+
from v03_pipeline.api.model import LoadingPipelineRequest
9+
from v03_pipeline.lib.logger import get_logger
10+
from v03_pipeline.lib.paths import (
11+
loading_pipeline_queue_path,
12+
project_pedigree_path,
13+
project_remap_path,
14+
)
15+
from v03_pipeline.lib.tasks import (
16+
UpdateCachedReferenceDatasetQueries,
17+
UpdateVariantAnnotationsTableWithNewSamplesTask,
18+
WriteProjectFamilyTablesTask,
19+
)
20+
21+
logger = get_logger(__name__)
22+
23+
24+
def main():
25+
while True:
26+
try:
27+
if not os.path.exists(loading_pipeline_queue_path()):
28+
continue
29+
with open(loading_pipeline_queue_path()) as f:
30+
lpr = LoadingPipelineRequest.model_validate_json(f.read())
31+
project_remap_paths = [
32+
project_remap_path(
33+
lpr.reference_genome,
34+
lpr.dataset_type,
35+
lpr.sample_type,
36+
project_guid,
37+
)
38+
for project_guid in lpr.projects_to_run
39+
]
40+
project_pedigree_paths = [
41+
project_pedigree_path(
42+
lpr.reference_genome,
43+
lpr.dataset_type,
44+
lpr.sample_type,
45+
project_guid,
46+
)
47+
for project_guid in lpr.projects_to_run
48+
]
49+
task_kwargs = {
50+
k: v for k, v in lpr.model_dump().items() if k != 'projects_to_run'
51+
}
52+
tasks = [
53+
UpdateCachedReferenceDatasetQueries(
54+
reference_genome=lpr.reference_genome,
55+
dataset_type=lpr.dataset_type,
56+
),
57+
UpdateVariantAnnotationsTableWithNewSamplesTask(
58+
project_guids=lpr.projects_to_run,
59+
project_remap_paths=project_remap_paths,
60+
project_pedigree_paths=project_pedigree_paths,
61+
run_id=datetime.datetime.now(datetime.timezone.utc).strftime(
62+
'%Y%m%d-%H%M%S',
63+
),
64+
force=False,
65+
**task_kwargs,
66+
),
67+
*[
68+
WriteProjectFamilyTablesTask(
69+
project_guid=lpr.projects_to_run[i],
70+
project_remap_path=project_remap_paths[i],
71+
project_pedigree_path=project_pedigree_paths[i],
72+
force=False,
73+
**task_kwargs,
74+
)
75+
for i in range(len(lpr.projects_to_run))
76+
],
77+
]
78+
luigi.build(tasks)
79+
except Exception:
80+
logger.exception('Unhandled Exception')
81+
finally:
82+
if os.path.exists(loading_pipeline_queue_path()):
83+
os.remove(loading_pipeline_queue_path())
84+
logger.info('Waiting for work')
85+
time.sleep(1)
86+
87+
88+
if __name__ == '__main__':
89+
main()

v03_pipeline/lib/logger.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@
2828
'urllib3': {
2929
'level': 'CRITICAL',
3030
},
31+
'asyncio': {
32+
'level': 'CRITICAL',
33+
},
3134
},
3235
}
3336

v03_pipeline/lib/model/cached_reference_dataset_query.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
)
1414

1515

16-
class CachedReferenceDatasetQuery(Enum):
16+
class CachedReferenceDatasetQuery(str, Enum):
1717
CLINVAR_PATH_VARIANTS = 'clinvar_path_variants'
1818
GNOMAD_CODING_AND_NONCODING_VARIANTS = 'gnomad_coding_and_noncoding_variants'
1919
GNOMAD_QC = 'gnomad_qc'

v03_pipeline/lib/model/dataset_type.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
ZERO = 0.0
1111

1212

13-
class DatasetType(Enum):
13+
class DatasetType(str, Enum):
1414
GCNV = 'GCNV'
1515
MITO = 'MITO'
1616
SNV_INDEL = 'SNV_INDEL'

0 commit comments

Comments
 (0)