Replies: 4 comments 1 reply
-
It looks like the simpler solution here would be to design the query in such a way that it always gets max_tis tasks that can be sent to run, this can be done by windowing over pools, which will solve the issue linked, however, I think that the scheduler prioritization should also be changed, as if we have the same situation as in the issue but on the same pool, we will get the same result. I thought about changing the prioritization to a weight, so that the lower prioritized tasks will also get to run, for example: Assume: As of now, airflow will first complete running Dag A before it starts running Dag B as the query will return 32 tasks with the highest priority and Dag B will starve. If we change the priority to be a weight, meaning that Dag A will get 5 slots for every 2 slots Dag B gets, both Dags will get to run at the same time and will complete faster overall, this will require a window function over pools and priorities. There are some edge cases, as the example above but with priorities of 100 and 1, this can be solved by trying to give at least 1 slot for each priority, in which case, what if we have more than 32 priorities? Choose the top 32 and work only on them? A possible solution is to add a configuration to allow for a maximum amount of tasks scheduler for top priority or just ignore the numbers itself and go of the largest, second largest and so on, while splitting the tasks available as fairly as possible according to the priority (which has problems of its own as what if we decided to schedule 4 tasks for given priority but there are only 2 tasks to schedule? Give it to the next in line? Or to the most prioritized?) I would love to hear any suggestions you might have either for simplification or improvement. |
Beta Was this translation helpful? Give feedback.
-
A good point is to clarify the ultimate goal we're striving to achieve. So far, always picking starved_dags: set[str] = set()
starved_tasks: set[tuple[str, str]] = set()
starved_tasks_task_dagrun_concurrency: set[tuple[str, str, str]] = set() Note that they are set to be empty, and only updated dynamically for the "rare" case that we will get to the second iteration. One of them is an exception: starved_pools = {pool_name for pool_name, stats in pools.items() if stats["open"] <= 0} As we check some edge case of a fully-occupied pool. It doesn't really matter, as each task specifies its own priority, and the weights are unrelated to pools or any other concurrency limit in our model. In certain configurations this condition will be always unmet (assume sum(task.pool_slots) != pool["total_slots"] for some pool), so Now we get at least 4 conditions that cause tasks to be dropped. We can enhance the query to window over every possible filter (or some subset of them) and avoid looping at all, knowing that the query returns just the good tasks, though windowing over numerical values may get us into cardinality issues and slow SQL performance. To explain it better, look at the following scenario: In the first scheduler iteration, 5 tasks from Dag A are scheduled, Dropped tasks inevitably cause delays in scheduling, and despite Airflow not being a real time system, this effect doesn't look like a desirable one, having a plenty of reproducible edge cases which occur in real data systems. I wonder if there is an agreement on The main problem here is the inability of the scheduling mechanism to handle all the concurrency policies right due to task dropping and wasting critical sections. Currently the only practical solution is to increase the number of schedulers until we bump into the bottleneck of the critical section which won't be capable to withstand the heavy load. We can try to compute the |
Beta Was this translation helpful? Give feedback.
-
Yep, so far the most promising and scalable approach is "window over everything" which gives us granularity at the level of individual scheduling entities, or objects that have parametrized limits taken into account - these are DAG runs and pools. We apparently have to sort the tasks by the policy defined fields, partition by combinations of limit holder fields, and try to stuff as many tasks as we can in each such window. We have to consider: Here's a pseudo query I wrote to demonstrate this idea:
The ??? means the field is not in the DB, it's currently computed manually into a concurrency map which complicates the matters. Mb it's worth to store these as temporary tables instead, or use a I hope I got the right notion of the data models since the fields are vaguely documented and I relied solely on their names, feel free to correct me if there are mistakes. We can replace |
Beta Was this translation helpful? Give feedback.
-
Closing to keep the discussion in one place. |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
The way critical section works now is:
select
query and get at mostmax_tis
task instances to schedulestarved_
filters and try againThe third step can cause any amount of tasks to be dropped due to concurrency limits (as long as there is at least one ready task found), and only few tasks will survive. At the same time, ready tasks will queue up in the table without getting the chance to run. This can cause tasks to starve for a long time in edge cases like almost full prioritized pools, as pointed out here:
#45636
We have to rethink the scheduler logic (the query or the loop altogether) to avoid this kind of starvation.
Beta Was this translation helpful? Give feedback.
All reactions