-
Notifications
You must be signed in to change notification settings - Fork 20
Local Pipeline api
and worker
#876
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…nstitute/seqr-loading-pipelines into benb/local_pipeline
…institute/seqr-loading-pipelines into benb/some_inheritance_refactoring
…nstitute/seqr-loading-pipelines into benb/local_pipeline
…s into benb/local_pipeline
raise web.HTTPInternalServerError(reason=error_reason) from e | ||
|
||
|
||
async def loading_pipeline_enqueue(request: web.Request) -> web.Response: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea here is for this process to accept requests and manage a queue of length one (just a file on the filesystem). It did not seem feasible for this process itself to run the luigi tasks, since they are likely to take many hours.
@@ -5,3 +5,5 @@ luigi>=3.4.0 | |||
gnomad==0.6.4 | |||
google-cloud-storage>=2.14.0 | |||
google-cloud-secret-manager>=2.20.0 | |||
aiofiles==24.1.0 | |||
pydantic==2.8.2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aiofiles
for async aware file io and pydantic
to help with the POST body validation/typing.
logger = get_logger(__name__) | ||
|
||
|
||
def main(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea is for this process to run as a sidecar in the same pod, basically looping indefinitely for a request to show up, then blocking on doing the work, then releasing the request once it succeeds or finishes.
for i in range(len(lpr.projects_to_run)) | ||
], | ||
] | ||
luigi.build(tasks) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two options for scheduling here:
- Run with
--local-scheduler
, which means this process will handle scheduling. This is what we're doing now in dataproc and works fine. - We stand up a second process in the pod that's running the luigi central scheduler (literally just the
luigid
command) that will expose the luigi UI. The centralized scheduler would be doing more for us if we had multiple workers, but with only one it isn't strictly necessary.
My preference is to at least try to get the centralized scheduler working since it appears lightweight and the UI is passable. It was super easy to get working locally (I went down the rabbit hole for ~an hour or so last week when trying to run tasks inside of aiohttp
and came up pretty quickly). The main unknowns there are how to expose the luigid
endpoint in helm
correctly, but I think this is at most a day of futzing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed that if we can get this done in under about a week it would be worth it, having a UI exposed to users so they can see how pipeline runs are progressing would be a very valuable feature
@@ -13,7 +13,7 @@ | |||
) | |||
|
|||
|
|||
class CachedReferenceDatasetQuery(Enum): | |||
class CachedReferenceDatasetQuery(str, Enum): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StrEnum
is available in python3.11 only. It's necessary for the json serialization in the endpoint.
k: v for k, v in lpr.model_dump().items() if k != 'projects_to_run' | ||
} | ||
tasks = [ | ||
UpdateCachedReferenceDatasetQueries( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are the three tasks necessary to get everything to run correctly. They do not have any dependency structure between them, though they share upstream dependencies (like UpdatedReferenceDataset
or WriteImprotedCallset
).
The blockers to condensing this to a single task are mostly around how we have split tasks apart in airflow. We have a for project_guid in projects_to_run
loop there so that each project task can be viewed in isolation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exciting stuff!
return web.json_response( | ||
{'Successfully queued': lpr.model_dump()}, | ||
status=web_exceptions.HTTPAccepted.status_code, | ||
) | ||
|
||
|
||
async def status(_: web.Request) -> web.Response: | ||
return web.json_response({'success': True}) | ||
|
||
|
||
async def init_web_app(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why use an async web server here as opposed to a normal synchronous one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question! This is basically a consistency choice. This web server comes embedded in the hail docker image and is what the hail search service uses. It is however, overkill. Especially if it requires aiofiles to touch the filesystem 🤷
No description provided.