-
Notifications
You must be signed in to change notification settings - Fork 0
3. Example Data Pipeline
Thiago Heron Ávila edited this page Jul 3, 2021
·
1 revision
- Se você não tiver criado airflow/dags, logo
mkdir dags/
airflow/dags/user_processing.py
# Create file airflow/dags/user_processing.py
from airflow.models import DAG
from airflow.providers.sqlite.operators.sqlite import SqliteOperator
from airflow.providers.http.sensors.http import HttpSensor
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime
from pandas import json_normalize
import json
default_args = {
'start_date': datetime(2020, 1, 1),
}
def _processing_user(ti):
users = ti.xcom_pull(task_ids=["extracting_user"])
if not len(users) or "results" not in users[0]:
raise ValueError("User is empty")
user = users[0]["results"][0]
df_processed_user = json_normalize({
"firstname": user["name"]["first"],
"lastname": user["name"]["last"],
"country": user["location"]["country"],
"username": user["login"]["username"],
"password": user["login"]["password"],
"email": user["email"],
})
df_processed_user.to_csv("/tmp/processed_user.csv", index=None, header=False)
with DAG(
"user_processing",
schedule_interval="@daily",
default_args=default_args,
catchup=False
) as dag:
creating_table = SqliteOperator(
task_id="creating_table",
sqlite_conn_id="db_sqlite",
sql='''
CREATE TABLE IF NOT EXISTS users (
firstname TEXT NOT NULL,
lastname TEXT NOT NULL,
country TEXT NOT NULL,
username TEXT NOT NULL,
password TEXT NOT NULL,
email TEXT NOT NULL PRIMARY KEY
);
'''
)
# Is API available
# pip install 'apache-airflow-providers-http'
is_api_available = HttpSensor(
task_id="is_api_available",
http_conn_id="user_api",
endpoint="api/" # Route Status
)
# Extracting Users
# Fetch Result
extracting_user = SimpleHttpOperator(
task_id="extracting_user",
http_conn_id="user_api",
endpoint="api/",
method="GET",
response_filter=lambda response: json.loads(response.text),
log_response=True,
)
# Processing User
# ls /tmp/
processing_user = PythonOperator(
task_id="processing_user",
python_callable=_processing_user,
)
# Store User
storing_user = BashOperator(
task_id="storing_user",
bash_command='echo -e ".separator ","\n.import /tmp/processed_user.csv users" | sqlite3 /home/thiagoheron/airflow/airflow.db'
)
# Dependencies
creating_table >> is_api_available >> extracting_user >> processing_user >> storing_user
- Menu → Admin → Connections → New
# SQLite
Conn ID: db_sqlite
Conn Type: Sqlite
Description: SQLITE connecttion to the DB.
Host: /home/thiagoheron/airflow/airflow.db
SAVE!!!
# HTTP
Conn ID: user_api
Conn Type: HTTP
Description: API for getting users.
Host: https://randomuser.me/
SAVE!!!
- Para cada task adicionado ao pipeline, deve-se testar:
# How to Test?
airflow tasks test dag_id task_id date_execution
# Tests Task 1 - Create User
airflow tasks test user_processing creating_table 2020-01-01
# Tests Task 2 - Check is API available
airflow tasks test user_processing is_api_available 2020-01-01
# Enter DB Slite
sqlite3 airflow.db
# List All Tables
.tables
# Nothing yet.
SELECT * ROM users;
-
Catchup = True (Vai criar varias DAG runs até chegar o teu dia atual, ou minuto atual, de acordo com teu schedule interval)
-
Catchup = False (Vai executar apenas a ultima?) e nao todos esses dias, cria so uma dagrun
-
Default True for all DAGs - Will trigger all the non triggered dags run starting from the latest execution date
DAG A - 2020-01-01 - @daily
---------------------------
01-01 DagRun 1 (DAG A)
02-01 DagRun 2 (Dag A) - If you pause here, dag
03-01 DagRun 3 (Dag A)
04-01 DagRun 4 (Dag A) - Não será rodada
05-01 DagRun 5 (Dag A) - Não será rodada
All dates are execution in UTC
Change in airflow.cg default_ui_timezout = UTC default_timezone= utc
Manter UTC (dica) - keep everything in UTC