-
Notifications
You must be signed in to change notification settings - Fork 0
9. BranchPythonOperator and Trigger Rules
Thiago Heron Ávila edited this page Jul 6, 2021
·
1 revision
- Retorna o nome da task que deseja seguir.
- O nome da próxima task é armazenada ao XCOM
- The Trigger Rules allow you to change the default behavior of your tasks, and more specifically, you can change how a task is getting triggered by changing its trigger.
- There is 9 different trigger rules to change the way your tasks are getting triggered.
all_succes
: Se todas as tasks forem success, a que está pedente será executada
- Task A: Success / Failed
- Task B: Success / Success
- Task C: Success / Upstream
all_failed
:
- Task A: Failed / Failed
- Task B: Failed / Success
- Task C: Success / Skipped
all_done
- Sempre que Task A ou Task B foi triggerred a Task C será Triggered.
one_succes
- Depois que Task A ou Task B a Task C será executada
one_failed
- Depois que a Task A ou Task B falhar, a Task C será executada.
none_failed
- Definida na Task C
- Se Task A e Task B for Success ou Skipped, a Task C será executada
none_failed_or_skipped
- Define na Task C
- Se a Task A OU Task B for Success, a Task C executada.
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.utils.task_group import TaskGroup
from airflow import DAG
from datetime import datetime
default_args = {
'start_date': datetime(2020, 1, 1)
}
with DAG('dag_using_trigger_rules', schedule_interval='@daily', default_args=default_args, catchup=False) as dag:
# 0 Success
# 1 Failed
with TaskGroup('default_rules') as default_rules:
task_1 = BashOperator(
task_id="task_1",
bash_command="exit 0",
do_xcom_push=False,
)
task_2 = BashOperator(
task_id="task_2",
bash_command="exit 0",
do_xcom_push=False,
)
task_3 = BashOperator(
task_id="task_3",
bash_command="exit 0",
do_xcom_push=False,
)
[task_1, task_2] >> task_3
with TaskGroup('all_failed') as all_failed:
task_1 = BashOperator(
task_id="task_1",
bash_command="exit 1",
do_xcom_push=False,
)
task_2 = BashOperator(
task_id="task_2",
bash_command="exit 1",
do_xcom_push=False,
)
task_3 = BashOperator(
task_id="task_3",
bash_command="exit 0",
do_xcom_push=False,
trigger_rule ="all_failed"
)
[task_1, task_2] >> task_3
with TaskGroup('all_done') as all_done:
task_1 = BashOperator(
task_id="task_1",
bash_command="exit 1",
do_xcom_push=False,
)
task_2 = BashOperator(
task_id="task_2",
bash_command="exit 0",
do_xcom_push=False,
)
task_3 = BashOperator(
task_id="task_3",
bash_command="exit 0",
do_xcom_push=False,
trigger_rule ="all_done"
)
[task_1, task_2] >> task_3
with TaskGroup('one_failed') as one_failed:
task_1 = BashOperator(
task_id="task_1",
bash_command="exit 1",
do_xcom_push=False,
)
task_2 = BashOperator(
task_id="task_2",
bash_command="sleep 30",
do_xcom_push=False,
)
task_3 = BashOperator(
task_id="task_3",
bash_command="exit 0",
do_xcom_push=False,
trigger_rule ="one_failed"
)
[task_1, task_2] >> task_3