Este projeto implementa um pipeline de extração de dados utilizando Apache Airflow, Docker e PostgreSQL para automatizar a coleta de dados de múltiplas fontes e carregá-los em um Data Warehouse.
O pipeline realiza extração de dados de:
- Banco de dados PostgreSQL (origem)
- Arquivo CSV com transações de clientes
Os dados extraídos são processados e carregados em um Data Warehouse containerizado para análise posterior.
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ PostgreSQL │ │ CSV File │ │ Data Lake │
│ (Origem) │───▶│ (Transações) │───▶│ (Storage) │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│
▼
┌─────────────────┐
│ Data Warehouse │
│ (PostgreSQL) │
└─────────────────┘
- Apache Airflow - Orquestração de workflows
- Astro CLI - Desenvolvimento local do Airflow
- Docker & Docker Compose - Containerização
- PostgreSQL - Banco de dados origem e Data Warehouse
- Pandas - Processamento de dados
- Python 3.12 - Linguagem principal
project/
├── dags/
│ └── pipeline_extraction.py # DAG principal
├── include/
│ ├── transacoes.csv # Arquivo CSV de origem
│ └── datalake/ # Diretório de armazenamento temporário
├── docker-compose.yml
├── Dockerfile
├── requirements.txt
└── README.md
- Docker e Docker Compose instalados
- Astro CLI instalado (
pip install astro-cli
) - Python 3.8+
- Clone o repositório
git clone https://github.com/luizfernandoOliveiraa/Airflow_Project.git
cd pipeline-extraction
- Inicie o ambiente com Astro
astro dev start
- Configure as conexões no Airflow
Acesse http://localhost:8080
e configure as seguintes conexões:
- Connection Type: Postgres
- Host: db_origem
- Schema: db_origem
- Login: dborigem_teste
- Password: dborigem_teste2025
- Port: 5432
- Connection Type: Postgres
- Host: db_datawarehouse
- Schema: db_datawarehouse
- Login: db_datawarehouse_teste2025
- Password: db_datawarehouse_teste2025
- Port: 5432
- Acesse a interface do Airflow em
http://localhost:8080
- Localize a DAG
pipeline_extraction
- Ative a DAG e execute manualmente ou aguarde o agendamento
A DAG está configurada para executar diariamente às 4:35 AM:
schedule='35 4 * * *'
agencias
clientes
colaborador_agencia
colaboradores
contas
propostas_credito
transacoes
(dados de transações de clientes)
-
Extração PostgreSQL (
postgres_extract
)- Conecta no banco de origem
- Extrai dados das tabelas configuradas
- Salva no data lake como CSV
-
Extração CSV (
extract_csv
)- Lê arquivo de transações
- Processa e salva no data lake
-
Carregamento Data Warehouse (
load_dw
)- Carrega todos os CSVs do data lake
- Insere/atualiza tabelas no DW
- Limpa dados temporários
- Logs detalhados disponíveis na interface do Airflow
- Tratamento de erros implementado em todas as tasks
- Notificações de falha configuráveis
- Adicione o nome da tabela em
SOURCE_TABLES
- Reinicie a DAG
Altere o parâmetro schedule
na definição da DAG:
@dag(
dag_id="pipeline_extraction",
schedule='0 6 * * *', # 6:00 AM diário
...
)
Erro de Conexão com Banco
- Verifique se as conexões estão configuradas corretamente
- Confirme se os containers estão rodando
Arquivo CSV não encontrado
- Verifique se o arquivo está em
/usr/local/airflow/include/transacoes.csv
- Confirme as permissões de leitura
Falha no Carregamento do DW
- Verifique espaço em disco
- Confirme credenciais do Data Warehouse
# Ver logs dos containers
docker-compose logs -f
# Reiniciar ambiente Airflow
astro dev restart
# Acessar container Airflow
astro dev bash
# Parar ambiente
astro dev stop
- Fork o projeto
- Crie uma branch para sua feature (
git checkout -b feature/NovaFuncionalidade
) - Commit suas mudanças (
git commit -m 'Adiciona nova funcionalidade'
) - Push para a branch (
git push origin feature/NovaFuncionalidade
) - Abra um Pull Request
Obs: Esse projeto foi desenvolvido em outro ambiente restrito, e adaptado para esse repositório, por isso temos poucos commits e sem desenvolvimento de testes.