Skip to content

9. BranchPythonOperator and Trigger Rules

Thiago Heron Ávila edited this page Jul 6, 2021 · 1 revision

9. Choose Task (BranchPythonOperator) and Trigger Rules

  • Retorna o nome da task que deseja seguir.
  • O nome da próxima task é armazenada ao XCOM

How to fix this?


Trigger Rules


  • The Trigger Rules allow you to change the default behavior of your tasks, and more specifically, you can change how a task is getting triggered by changing its trigger.
  • There is 9 different trigger rules to change the way your tasks are getting triggered.

all_succes: Se todas as tasks forem success, a que está pedente será executada

  • Task A: Success / Failed
  • Task B: Success / Success
    • Task C: Success / Upstream

all_failed:

  • Task A: Failed / Failed
  • Task B: Failed / Success
    • Task C: Success / Skipped

all_done

  • Sempre que Task A ou Task B foi triggerred a Task C será Triggered.

one_succes

  • Depois que Task A ou Task B a Task C será executada

one_failed

  • Depois que a Task A ou Task B falhar, a Task C será executada.

none_failed

  • Definida na Task C
  • Se Task A e Task B for Success ou Skipped, a Task C será executada

none_failed_or_skipped

  • Define na Task C
  • Se a Task A OU Task B for Success, a Task C executada.
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.utils.task_group import TaskGroup

from airflow import DAG
from datetime import datetime

default_args = {
    'start_date': datetime(2020, 1, 1)
}

with DAG('dag_using_trigger_rules', schedule_interval='@daily', default_args=default_args, catchup=False) as dag:
    # 0 Success
    # 1 Failed
    
    with TaskGroup('default_rules') as default_rules:
        task_1 = BashOperator(
            task_id="task_1",
            bash_command="exit 0",
            do_xcom_push=False,
        )

        task_2 = BashOperator(
            task_id="task_2",
            bash_command="exit 0",
            do_xcom_push=False,
        )

        task_3 = BashOperator(
            task_id="task_3",
            bash_command="exit 0",
            do_xcom_push=False,
        )

        [task_1, task_2] >> task_3

    with TaskGroup('all_failed') as all_failed:
        task_1 = BashOperator(
            task_id="task_1",
            bash_command="exit 1",
            do_xcom_push=False,
        )

        task_2 = BashOperator(
            task_id="task_2",
            bash_command="exit 1",
            do_xcom_push=False,
        )

        task_3 = BashOperator(
            task_id="task_3",
            bash_command="exit 0",
            do_xcom_push=False,
            trigger_rule ="all_failed"
        )
        [task_1, task_2] >> task_3

    with TaskGroup('all_done') as all_done:
        task_1 = BashOperator(
            task_id="task_1",
            bash_command="exit 1",
            do_xcom_push=False,
        )

        task_2 = BashOperator(
            task_id="task_2",
            bash_command="exit 0",
            do_xcom_push=False,
        )

        task_3 = BashOperator(
            task_id="task_3",
            bash_command="exit 0",
            do_xcom_push=False,
            trigger_rule ="all_done"
        )
        [task_1, task_2] >> task_3

    
    with TaskGroup('one_failed') as one_failed:
        task_1 = BashOperator(
            task_id="task_1",
            bash_command="exit 1",
            do_xcom_push=False,
        )

        task_2 = BashOperator(
            task_id="task_2",
            bash_command="sleep 30",
            do_xcom_push=False,
        )

        task_3 = BashOperator(
            task_id="task_3",
            bash_command="exit 0",
            do_xcom_push=False,
            trigger_rule ="one_failed"
        )
        [task_1, task_2] >> task_3
Clone this wiki locally