-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Description
Describe the current behavior
Currently it is not possible to set a shared global concurrency limit for multiple deployments using the prefect
CLI tool or Python SDK. I was however able to do that using a combination of CLI and REST API calls.
Why do I need it
We have a single K8s worker in our cluster and a single work pool. There are two types of deployments A and B that I would like to trigger. I would like to impose a global concurrency limit on both deployments, such that at any given moment I have no more than N concurrent runs of A and B combined, that is number_of_running_flows(A) + number_of_running_flows(B) <= N
.
Currently available solution
I need to create a separate work pool for A and B, which requires me to have a dedicated worker for that pool, because AFAIU there is one to one relationship between the worker and work pool.
What is wrong with that?
I would rather avoid running a separate worker just for that -- it seems an unnecessary waste of K8s CPU / memory and additional infrastructural component to take care of.
My hack that avoids adding additional work pool / worker to impose a concurrency limit on multiple deployments
My colleague noticed that the REST API for POST /deployments/
has a global_concurrency_limit_id
parameter that is not settable in through the prefect.yaml
, which is the key to this hack:
- Create two flows that we would want to limit:
import prefect
import time
@prefect.flow(name="a")
def a():
time.sleep(1500)
@prefect.flow(name="a")
def b():
time.sleep(1500)
- Create deployments in prefect.yaml for the flows above.
- Create a global concurrencly limit (GCL) with the name
two-at-most
:
prefect gcl create two-at-most --limit 2
save the ID of the newly created GCL (I looked it up with prefect gcl ls
).
4. Bind newly create global concurrency limit two-at-most
to both deployents. I used FastAPI's docs page at /api/docs
to make the REST calls. Trigger PATCH /deployments/{id}
endpoint providing the ID of deployment A
and the body that looks like this:
{
"global_concurrency_limit_id": "id-of-the-gcl"
}
where id-of-the-gcl
is the ID of our newly created GCL. Do the same for deployment B
.
5. Trigger two flow runs of deployment A and one flow run of deployment B and observe that B is waiting conforming to the GCL. π
Describe the proposed behavior
In effect, all components that allow me to use the same concurrency limit for different deployments are in place, it's just that there is no way of setting global_concurrency_limit_id
in a convinient way. I would be happy if it would be possible to set global_concurrency_limit_id
in prefect.yaml
.
I looked at the implementation and it seems that one would need to add global_concurrency_limit_id
to prefect.deployments.runner.RunnerDeployment
class and a
global_concurrency_limit_id=deploy_config.get("global_concurrency_limit_id"),
to the invocation of RunnerDeployment
constructor in prefect.cli.deploy._run_single_deploy
.
The caveat that I noticed is that a GCL is removed when either of the deployments is deleted. I see this is done unconditionally in prefect/server/models/deployments.py
:
async def delete_deployment(
db: PrefectDBInterface, session: AsyncSession, deployment_id: UUID
) -> bool:
...
await _delete_related_concurrency_limit(
db, session=session, deployment_id=deployment_id
)
I guess, this would need to be augmented to delete GCL only if it is not shared with any other deployment.
Example Use
- Create GCL and get its ID:
prefect gcl create my-special-limit --limit 2
export GCL_ID=$(prefect gcl ls | grep apa-licenses | awk '{print $2;}')
- Create prefect.yaml with deployment specifications that reference a shared GCL
deployments:
- name: "a"
version: null
schedule: {}
entrypoint: flows.py:a
global_concurrency_limit_id: "{{ $GCL_ID }}"
work_pool:
name: "default"
work_queue_name: low-priority
job_variables: {}
- name: "b"
version: null
description: Update APA database XYZ.
entrypoint: service/jobs.py:update_apa_db
global_concurrency_limit_id: "{{ $GCL_ID }}"
work_pool:
name: "default"
work_queue_name: high-priority
job_variables: {}
- Deploy:
prefect --no-prompt deploy --all --prefect-file prefect.yaml
- π π π π
Additional context
No response