Replies: 7 comments 5 replies
-
Thanks for opening your first issue here! Be sure to follow the issue template! |
Beta Was this translation helpful? Give feedback.
-
Is there any reason you can't have an "all_done" task that checks if there is an upstream task that succeeded and trigger data loading then?
IMHO, it is fully doable, does not require to add any complexity and adding more complex trigger rules, it very well reflects your use case (you anyhow need to wait with the decision until all of the ingestion tasks are done), and it also gives you clear indication of the case where the "CHECK ALL UPSTREAMS" task will clearly inform you that this was the reason. Checking the state of those tasks can be easily done by XCom (and if you do the "CHECK" task as @task decorated operator, it is as simple as checking the output of all of the tasks. Moreover the "check" task could simply pass the list of all ingested datasets as a single input tho the LOAD task, so that it does not have to rely on checking if any of the tasks failed or not. For me this is a very nice separation of concerns - having one task to decide whether to proceed or not based on deep analysis of which of those tasks succeed and which did not, and separately having a LOAD tasks that loads whatever you pass to it - without worrying about states there. I am afraid your LOAD task in your case will have to do that kind of check anyway if you would like to get the rule you want (but in a multi-function task). Splitting those seems much cleaner (and you could make yur "CHECK" task reusable across several different LOAD tasks as well - without having to implement the same "select only those that succeeded" logic. Anything wrong with this approach in your case? |
Beta Was this translation helpful? Give feedback.
-
@potiuk your proposed solution sounds really good, thank you! I'd just like to remark that the data loading task doesn't necessarily have to know which data ingestion tasks actually failed/succeeded to make further decisions. E.g., it may use some kind of a pattern like So my idea was to just have a trigger rule handling such simple cases (of course, without introducing extra complexity). Yes, I agree that adding yet another auxiliary task to check the state of the upstream tasks isn't a big deal, but it's still a kind of logic that might arise again and again... Also, I believe the approach described above is pretty generic, so it can be used to imitate a lot of different behaviors, including but not limited to I know that what I'm trying to describe covers a pretty simple use case, but I really think that good software should strive to make simple things even easier (and wrong things impossible!), and in this particular case, unfortunately, there isn't an easy way to achieve the desired behavior. |
Beta Was this translation helpful? Give feedback.
-
Then your LOAD task should fail if it does not find any file matching the pattern (and still can use all_done). Problem solved. |
Beta Was this translation helpful? Give feedback.
-
Conveting it into discussion. I do not really see a reason for new trigger rule, seems that those patterns we have now are supporting the use cases pretty well (either by intermediate task or by simply failing when there are no outputs from upstream tasks produced) - for example with pattern matching, while using |
Beta Was this translation helpful? Give feedback.
-
Could there be a solution with implementation of |
Beta Was this translation helpful? Give feedback.
-
I ran into this problem and like many, I really wish there was a trigger_rule that would solve this. It makes perfect sense. NOTE: the only important part in this example is the function import datetime
import pendulum
from airflow.decorators import dag, task_group, task
from airflow.models.taskinstance import TaskInstance
from airflow.models.dagrun import DagRun
from airflow.exceptions import AirflowTaskTerminated
# pylint: disable=expression-not-assigned,no-value-for-parameter,pointless-statement
RETRIES = 0
@dag(
dag_id="my_dag",
schedule_interval=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
dagrun_timeout=datetime.timedelta(minutes=7 * 24 * 60)
)
def my_dag():
"this is an example dag"
@task(trigger_rule="all_done")
def all_done_min_one_success(dag_run:DagRun = None, ti:TaskInstance = None):
"Workaround task to check if all upstream tasks are done and at least one has succeeded"
# Get all tasks that are directly upstream of this task
# For *all* tasks, not just direct relatives, use 'get_flat_relative_ids(upstream=True)'
upstream_task_ids: set[str] = ti.task.get_direct_relative_ids(upstream=True)
# Get list of all tasks that have succeeded and have failed for this DagRun
succeeded_task_instances: list[TaskInstance] = dag_run.get_task_instances(state="success")
failed_task_instances: list[TaskInstance] = dag_run.get_task_instances(state="failed")
# Get the intersections of succeeded and of failed ids with direct relatives
succeeded_upstream_task_ids = upstream_task_ids.intersection([task.task_id for task in succeeded_task_instances])
failed_upstream_task_ids = upstream_task_ids.intersection([task.task_id for task in failed_task_instances])
# Check to see if there is at least one task id that has succeeded
if len(succeeded_upstream_task_ids) >= 1:
# (Optional) log or print successes and failures
print("the following tasks succeeded: %s", succeeded_upstream_task_ids)
print("the following tasks failed: %s", failed_upstream_task_ids)
else:
# Log the failed tasks and raise an exception, such as AirflowTaskTerminated
# For a full list of exceptions, see the documentation here:
# https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/exceptions/index.html#exceptions
print("no tasks succeeded. The following tasks failed: %s", failed_upstream_task_ids)
raise AirflowTaskTerminated("no tasks succeeded")
@task(retries=RETRIES)
def dummy1():
print("hello")
@task(retries=RETRIES)
def dummy1_1():
print("world")
@task(retries=RETRIES)
def dummy2():
raise Exception("something has gone wrong")
@task_group(group_id="my_group")
def my_group():
dummy1() >> dummy1_1()
dummy2()
my_group() >> all_done_min_one_success() >> dummy1() >> dummy1_1()
dag = my_dag() |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
Description
The trigger rule was requested in #10758 and #17010, but
none_failed_min_one_success
was proposed as a solution.I find the proposed solution unsatisfactory since
none_failed_min_one_success
isn't the same asall_done_min_one_success
(the latter rule allows some upstream tasks to fail as soon as at least one of them succeeds).I can try working on a PR if the feature is approved.
Use case/motivation
I have a few data ingestion tasks (each one ingests data from a separate external data source to the internal data lake) and a data loading task (it loads data from the data lake to the data warehouse). I want the data loading task to be triggered if at least one of the data ingestion tasks succeeds. Obviously, if all the data ingestion tasks fail, then there is no point in triggering the data loading task. But if at least one of them succeeds, then in my particular case it's better to load the just ingested data (and figure out why the other tasks failed later) rather than not loading anything at all.
Related issues
Are you willing to submit a PR?
Code of Conduct
Beta Was this translation helpful? Give feedback.
All reactions