Skip to content

Allow to set global concurrency limit ID in the deployment YAML (for running prefect deploy)Β #18715

@anton-daneyko-ultramarin

Description

Describe the current behavior

Currently it is not possible to set a shared global concurrency limit for multiple deployments using the prefect CLI tool or Python SDK. I was however able to do that using a combination of CLI and REST API calls.

Why do I need it

We have a single K8s worker in our cluster and a single work pool. There are two types of deployments A and B that I would like to trigger. I would like to impose a global concurrency limit on both deployments, such that at any given moment I have no more than N concurrent runs of A and B combined, that is number_of_running_flows(A) + number_of_running_flows(B) <= N.

Currently available solution

I need to create a separate work pool for A and B, which requires me to have a dedicated worker for that pool, because AFAIU there is one to one relationship between the worker and work pool.

What is wrong with that?

I would rather avoid running a separate worker just for that -- it seems an unnecessary waste of K8s CPU / memory and additional infrastructural component to take care of.

My hack that avoids adding additional work pool / worker to impose a concurrency limit on multiple deployments

My colleague noticed that the REST API for POST /deployments/ has a global_concurrency_limit_id parameter that is not settable in through the prefect.yaml, which is the key to this hack:

  1. Create two flows that we would want to limit:
import prefect
import time


@prefect.flow(name="a")
def a():
    time.sleep(1500)


@prefect.flow(name="a")
def b():
    time.sleep(1500)
  1. Create deployments in prefect.yaml for the flows above.
  2. Create a global concurrencly limit (GCL) with the name two-at-most:
prefect gcl create two-at-most --limit 2

save the ID of the newly created GCL (I looked it up with prefect gcl ls).
4. Bind newly create global concurrency limit two-at-most to both deployents. I used FastAPI's docs page at /api/docs to make the REST calls. Trigger PATCH /deployments/{id} endpoint providing the ID of deployment A and the body that looks like this:

 {
  "global_concurrency_limit_id": "id-of-the-gcl"
}

where id-of-the-gcl is the ID of our newly created GCL. Do the same for deployment B.
5. Trigger two flow runs of deployment A and one flow run of deployment B and observe that B is waiting conforming to the GCL. 🎊

Describe the proposed behavior

In effect, all components that allow me to use the same concurrency limit for different deployments are in place, it's just that there is no way of setting global_concurrency_limit_id in a convinient way. I would be happy if it would be possible to set global_concurrency_limit_id in prefect.yaml.

I looked at the implementation and it seems that one would need to add global_concurrency_limit_id to prefect.deployments.runner.RunnerDeployment class and a

        global_concurrency_limit_id=deploy_config.get("global_concurrency_limit_id"),

to the invocation of RunnerDeployment constructor in prefect.cli.deploy._run_single_deploy.

The caveat that I noticed is that a GCL is removed when either of the deployments is deleted. I see this is done unconditionally in prefect/server/models/deployments.py:

async def delete_deployment(
    db: PrefectDBInterface, session: AsyncSession, deployment_id: UUID
) -> bool:
...
    await _delete_related_concurrency_limit(
        db, session=session, deployment_id=deployment_id
    )

I guess, this would need to be augmented to delete GCL only if it is not shared with any other deployment.

Example Use

  1. Create GCL and get its ID:
prefect gcl create my-special-limit --limit 2
export GCL_ID=$(prefect gcl ls | grep apa-licenses | awk '{print $2;}')
  1. Create prefect.yaml with deployment specifications that reference a shared GCL
deployments:
- name: "a"
  version: null
  schedule: {}
  entrypoint: flows.py:a
  global_concurrency_limit_id: "{{ $GCL_ID }}"
  work_pool:
      name: "default"
      work_queue_name: low-priority
      job_variables: {}

- name: "b"
  version: null
  description: Update APA database XYZ.
  entrypoint: service/jobs.py:update_apa_db
  global_concurrency_limit_id: "{{ $GCL_ID }}"
  work_pool:
      name: "default"
      work_queue_name: high-priority
      job_variables: {}
  1. Deploy:
prefect --no-prompt deploy --all --prefect-file prefect.yaml
  1. πŸ’ƒ πŸŽ‚ 🎊 πŸŽ‰

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementAn improvement of an existing feature

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions