Skip to content

Local Pipeline api and worker #876

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 32 commits into from
Aug 28, 2024
Merged

Local Pipeline api and worker #876

merged 32 commits into from
Aug 28, 2024

Conversation

bpblanken
Copy link
Collaborator

No description provided.

Base automatically changed from benb/some_inheritance_refactoring to benb/remove_reference_data_env_var August 19, 2024 17:51
@bpblanken bpblanken changed the base branch from benb/remove_reference_data_env_var to dev August 20, 2024 22:25
raise web.HTTPInternalServerError(reason=error_reason) from e


async def loading_pipeline_enqueue(request: web.Request) -> web.Response:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea here is for this process to accept requests and manage a queue of length one (just a file on the filesystem). It did not seem feasible for this process itself to run the luigi tasks, since they are likely to take many hours.

@@ -5,3 +5,5 @@ luigi>=3.4.0
gnomad==0.6.4
google-cloud-storage>=2.14.0
google-cloud-secret-manager>=2.20.0
aiofiles==24.1.0
pydantic==2.8.2
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aiofiles for async aware file io and pydantic to help with the POST body validation/typing.

logger = get_logger(__name__)


def main():
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is for this process to run as a sidecar in the same pod, basically looping indefinitely for a request to show up, then blocking on doing the work, then releasing the request once it succeeds or finishes.

for i in range(len(lpr.projects_to_run))
],
]
luigi.build(tasks)
Copy link
Collaborator Author

@bpblanken bpblanken Aug 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two options for scheduling here:

  • Run with --local-scheduler, which means this process will handle scheduling. This is what we're doing now in dataproc and works fine.
  • We stand up a second process in the pod that's running the luigi central scheduler (literally just the luigid command) that will expose the luigi UI. The centralized scheduler would be doing more for us if we had multiple workers, but with only one it isn't strictly necessary.

My preference is to at least try to get the centralized scheduler working since it appears lightweight and the UI is passable. It was super easy to get working locally (I went down the rabbit hole for ~an hour or so last week when trying to run tasks inside of aiohttp and came up pretty quickly). The main unknowns there are how to expose the luigid endpoint in helm correctly, but I think this is at most a day of futzing.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed that if we can get this done in under about a week it would be worth it, having a UI exposed to users so they can see how pipeline runs are progressing would be a very valuable feature

@@ -13,7 +13,7 @@
)


class CachedReferenceDatasetQuery(Enum):
class CachedReferenceDatasetQuery(str, Enum):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StrEnum is available in python3.11 only. It's necessary for the json serialization in the endpoint.

k: v for k, v in lpr.model_dump().items() if k != 'projects_to_run'
}
tasks = [
UpdateCachedReferenceDatasetQueries(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are the three tasks necessary to get everything to run correctly. They do not have any dependency structure between them, though they share upstream dependencies (like UpdatedReferenceDataset or WriteImprotedCallset).

The blockers to condensing this to a single task are mostly around how we have split tasks apart in airflow. We have a for project_guid in projects_to_run loop there so that each project task can be viewed in isolation.

@bpblanken bpblanken changed the title Benb/local pipeline Local Pipeline api and worker Aug 21, 2024
@bpblanken bpblanken marked this pull request as ready for review August 21, 2024 17:36
@bpblanken bpblanken requested a review from a team as a code owner August 21, 2024 17:36
Copy link
Collaborator

@hanars hanars left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exciting stuff!

return web.json_response(
{'Successfully queued': lpr.model_dump()},
status=web_exceptions.HTTPAccepted.status_code,
)


async def status(_: web.Request) -> web.Response:
return web.json_response({'success': True})


async def init_web_app():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use an async web server here as opposed to a normal synchronous one?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question! This is basically a consistency choice. This web server comes embedded in the hail docker image and is what the hail search service uses. It is however, overkill. Especially if it requires aiofiles to touch the filesystem 🤷

@bpblanken bpblanken merged commit 8602958 into dev Aug 28, 2024
3 checks passed
@bpblanken bpblanken deleted the benb/local_pipeline branch August 28, 2024 16:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants