Skip to content

3. Example Data Pipeline

Thiago Heron Ávila edited this page Jul 3, 2021 · 1 revision

3. Example Data Pipeline

  1. Se você não tiver criado airflow/dags, logo mkdir dags/

airflow/dags/user_processing.py

# Create file airflow/dags/user_processing.py

from airflow.models import DAG
from airflow.providers.sqlite.operators.sqlite import SqliteOperator
from airflow.providers.http.sensors.http import HttpSensor
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

from datetime import datetime
from pandas import json_normalize
import json

default_args = {
   'start_date': datetime(2020, 1, 1),
}

def _processing_user(ti):
    users = ti.xcom_pull(task_ids=["extracting_user"])
    if not len(users) or "results" not in users[0]:
        raise ValueError("User is empty")

    user = users[0]["results"][0]
    df_processed_user = json_normalize({
        "firstname": user["name"]["first"],
        "lastname": user["name"]["last"],
        "country": user["location"]["country"],
        "username": user["login"]["username"],
        "password": user["login"]["password"],
        "email": user["email"],
    })  
    df_processed_user.to_csv("/tmp/processed_user.csv", index=None, header=False)

with DAG(
   "user_processing",
   schedule_interval="@daily",
   default_args=default_args,
   catchup=False
) as dag:

    creating_table = SqliteOperator(
	    task_id="creating_table",
        sqlite_conn_id="db_sqlite",
        sql='''
            CREATE TABLE IF NOT EXISTS users (
            firstname TEXT NOT NULL,
            lastname TEXT NOT NULL,
            country TEXT NOT NULL,
            username TEXT NOT NULL,
            password TEXT NOT NULL,
            email TEXT NOT NULL PRIMARY KEY
        );
        '''
    )

    # Is API available
    # pip install 'apache-airflow-providers-http'
    is_api_available = HttpSensor(
	    task_id="is_api_available",
        http_conn_id="user_api",
        endpoint="api/" # Route Status
    )

    # Extracting Users
    # Fetch Result
    extracting_user = SimpleHttpOperator(
        task_id="extracting_user",
        http_conn_id="user_api",
        endpoint="api/",
        method="GET",
        response_filter=lambda response: json.loads(response.text),
        log_response=True,
    )

    # Processing User
    # ls /tmp/
    processing_user = PythonOperator(
        task_id="processing_user",
        python_callable=_processing_user,
    )

    # Store User
    storing_user = BashOperator(
        task_id="storing_user",
        bash_command='echo -e ".separator ","\n.import /tmp/processed_user.csv users" | sqlite3 /home/thiagoheron/airflow/airflow.db'
    )
     
    # Dependencies
    creating_table >> is_api_available >> extracting_user >> processing_user >> storing_user

Create a SQLite e API Connections


  • Menu → Admin → Connections → New
# SQLite
Conn ID: db_sqlite
Conn Type: Sqlite
Description: SQLITE connecttion to the DB.
Host: /home/thiagoheron/airflow/airflow.db
SAVE!!!

# HTTP
Conn ID: user_api
Conn Type: HTTP
Description: API for getting users.
Host: https://randomuser.me/
SAVE!!!

Testing Tasks (Best Practice)


  • Para cada task adicionado ao pipeline, deve-se testar:
# How to Test?
airflow tasks test dag_id task_id date_execution

# Tests Task 1 - Create User
airflow tasks test user_processing creating_table 2020-01-01

# Tests Task 2 - Check is API available
airflow tasks test user_processing is_api_available 2020-01-01

Enter SQLite

# Enter DB Slite
sqlite3 airflow.db

# List All Tables
.tables

# Nothing yet.
SELECT * ROM users;
  • Catchup = True (Vai criar varias DAG runs até chegar o teu dia atual, ou minuto atual, de acordo com teu schedule interval)

  • Catchup = False (Vai executar apenas a ultima?) e nao todos esses dias, cria so uma dagrun

  • Default True for all DAGs - Will trigger all the non triggered dags run starting from the latest execution date

DAG A - 2020-01-01 - @daily
---------------------------
01-01 DagRun 1 (DAG A)
02-01 DagRun 2 (Dag A) - If you pause here, dag
03-01 DagRun 3 (Dag A)

04-01 DagRun 4 (Dag A) - Não será rodada
05-01 DagRun 5 (Dag A) - Não será rodada

All dates are execution in UTC

Change in airflow.cg default_ui_timezout = UTC default_timezone= utc

Manter UTC (dica) - keep everything in UTC

Clone this wiki locally