pyspark-dev
A comprehensive PySpark data processing framework designed for Amazon EMR Serverless with Apache Airflow integration. This project provides scalable ETL pipelines for processing and transforming digital land data collections.
This repository contains PySpark jobs that process various digital land datasets including:
- Transport access nodes
- Title boundaries
- Entity data transformations
- Fact and fact resource processing
- Issue tracking and validation
- β EMR Serverless Ready: Optimized for AWS EMR Serverless execution
- β Airflow Integration: DAGs for orchestrating data workflows
- β Modular Design: Reusable transformation components
- β Comprehensive Testing: Unit, integration, and acceptance tests with pytest
- β Configuration Management: JSON-based dataset and schema configuration
- β AWS Secrets Integration: Secure credential management
- β Multiple Output Formats: Support for Parquet, CSV, and database outputs
pyspark-jobs/
βββ src/ # Source code
β βββ jobs/ # Core PySpark job modules
β β βββ main_collection_data.py # Main ETL pipeline
β β βββ transform_collection_data.py # Data transformation logic
β β βββ run_main.py # EMR entry point script
β β βββ config/ # Configuration files
β β β βββ datasets.json # Dataset definitions
β β β βββ transformed_source.json # Schema configurations
β β βββ dbaccess/ # Database connectivity modules
β βββ utils/ # Utility modules
β β βββ aws_secrets_manager.py # AWS Secrets Manager integration
β β βββ path_utils.py # Path resolution utilities
β βββ airflow/ # Airflow DAGs and configuration
β β βββ dags/ # Airflow DAG definitions
β βββ infra/ # Infrastructure scripts
β βββ emr/ # EMR deployment scripts
βββ tests/ # Comprehensive test suite
β βββ unit/ # Unit tests (fast, isolated)
β βββ integration/ # Integration tests (databases, files)
β βββ acceptance/ # End-to-end workflow tests
β βββ conftest.py # Shared test configuration
βββ examples/ # Usage examples
βββ requirements.txt # Production dependencies
βββ requirements-test.txt # Testing dependencies
βββ pytest.ini # Pytest configuration
βββ setup.py # Package configuration
βββ README.md # This file
- Python 3.8+
- Java 11+ (for PySpark)
- Apache Spark 3.3+
- AWS CLI configured (for deployment)
- Clone the repository:
git clone <repository-url>
cd pyspark-jobs
- Install dependencies:
# Production dependencies
pip install -r requirements.txt
# Development and testing dependencies
pip install -r requirements-test.txt
- Install the package in development mode:
pip install -e .
- Run a specific transformation:
python src/jobs/run_main.py \
--load_type full \
--data_set transport-access-node \
--path s3://your-bucket/data/
- Execute the main ETL pipeline:
python src/jobs/main_collection_data.py
This project includes a comprehensive test suite with three levels of testing:
# Run all tests
pytest
# Run specific test categories
pytest -m unit # Fast unit tests
pytest -m integration # Integration tests
pytest -m acceptance # End-to-end tests
# Run with coverage
pytest --cov=src --cov-report=html
# Run in parallel
pytest -n auto
- Unit Tests (
tests/unit/
): Fast, isolated component tests - Integration Tests (
tests/integration/
): Database and external service tests - Acceptance Tests (
tests/acceptance/
): Complete workflow validation
For detailed testing information, see tests/README.md.
The core ETL pipeline (main_collection_data.py
) processes data through these stages:
- Data Extraction: Load from S3 CSV files
- Data Transformation: Apply business logic transformations
- Data Loading: Output to partitioned Parquet files
Configure datasets in src/jobs/config/datasets.json
:
{
"transport-access-node": {
"path": "s3://bucket/transport-access-node-collection/",
"enabled": true
},
"title-boundaries": {
"path": "s3://bucket/title-boundary-collection/",
"enabled": false
}
}
- Fact Processing: Deduplicate and prioritize fact records
- Fact Resource Processing: Extract resource relationships
- Entity Processing: Pivot fields into structured entity records
- Issue Processing: Track and validate data quality issues
# AWS Configuration
export AWS_REGION=eu-west-2
export AWS_ACCESS_KEY_ID=your-access-key
export AWS_SECRET_ACCESS_KEY=your-secret-key
# Database Configuration (optional)
export POSTGRES_SECRET_NAME=your-secret-name
export USE_DATABASE=true
# Spark Configuration
export PYSPARK_PYTHON=python3
export SPARK_HOME=/path/to/spark
Use AWS Secrets Manager for secure credential storage:
from utils.aws_secrets_manager import get_database_credentials
# Retrieve database credentials
db_creds = get_database_credentials("myapp/database/postgres")
See examples/secrets_usage_example.py for detailed usage.
- Package the application:
python setup.py bdist_wheel
- Upload to S3:
aws s3 cp dist/pyspark_jobs-*.whl s3://your-bucket/packages/
aws s3 cp src/jobs/run_main.py s3://your-bucket/scripts/
- Submit EMR Serverless job:
aws emr-serverless start-job-run \
--application-id your-app-id \
--execution-role-arn your-role-arn \
--job-driver '{
"sparkSubmit": {
"entryPoint": "s3://your-bucket/scripts/run_main.py",
"sparkSubmitParameters": "--py-files s3://your-bucket/packages/pyspark_jobs-*.whl"
}
}'
Deploy DAGs to Amazon MWAA:
aws s3 sync src/airflow/dags/ s3://your-airflow-bucket/dags/
Access Spark UI at http://localhost:4040
during local execution.
EMR Serverless jobs automatically log to CloudWatch under:
/aws/emr-serverless/applications/{application-id}/jobs/{job-run-id}
Structured logging with configurable levels:
import logging
logger = logging.getLogger(__name__)
logger.info("Processing started for dataset: %s", dataset_name)
- Automatic schema inference and validation
- Support for required and optional fields
- Data type enforcement
- Comprehensive data quality checks
- Issue categorization and reporting
- Integration with fact/entity processing
- Jobs: Main processing logic in
src/jobs/
- Utils: Shared utilities in
src/utils/
- Configuration: JSON-based config in
src/jobs/config/
- Tests: Comprehensive test suite in
tests/
- Create transformation function in
transform_collection_data.py
- Add schema configuration to
config/
directory - Write comprehensive tests in appropriate test directory
- Update dataset configuration if needed
# Run linters
black src/ tests/
flake8 src/ tests/
isort src/ tests/
# Type checking
mypy src/
See the examples/
directory for:
- AWS Secrets Manager usage
- Custom transformation examples
- Configuration templates
- Deployment scripts
- Fork the repository
- Create a feature branch
- Make changes with tests
- Run the test suite
- Submit a pull request
# Install development dependencies
pip install -r requirements-test.txt
# Install pre-commit hooks
pre-commit install
# Run tests before committing
pytest
This project is licensed under the MIT License - see the LICENSE file for details.
For issues and questions:
- Check the tests/README.md for testing guidance
- Review examples in the
examples/
directory - Check documentation in the
docs/
directory: - Open an issue on GitHub
The project includes configuration for:
- Automated testing with pytest
- Code quality checks
- AWS deployment pipelines
- Docker containerization support
# Example workflow
name: CI/CD Pipeline
on: [push, pull_request]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
- run: pip install -r requirements-test.txt
- run: pytest --cov=src
Built with β€οΈ for Digital Land data processing pyspark-jobs repo for pyspark jobs. added code for issue table, fact-res, fact tables. main