Skip to content

digital-land/pyspark-jobs

Repository files navigation

pyspark-dev

PySpark Jobs for Digital Land

A comprehensive PySpark data processing framework designed for Amazon EMR Serverless with Apache Airflow integration. This project provides scalable ETL pipelines for processing and transforming digital land data collections.

πŸ—οΈ Project Overview

This repository contains PySpark jobs that process various digital land datasets including:

  • Transport access nodes
  • Title boundaries
  • Entity data transformations
  • Fact and fact resource processing
  • Issue tracking and validation

Key Features

  • βœ… EMR Serverless Ready: Optimized for AWS EMR Serverless execution
  • βœ… Airflow Integration: DAGs for orchestrating data workflows
  • βœ… Modular Design: Reusable transformation components
  • βœ… Comprehensive Testing: Unit, integration, and acceptance tests with pytest
  • βœ… Configuration Management: JSON-based dataset and schema configuration
  • βœ… AWS Secrets Integration: Secure credential management
  • βœ… Multiple Output Formats: Support for Parquet, CSV, and database outputs

πŸ“ Project Structure

pyspark-jobs/
β”œβ”€β”€ src/                          # Source code
β”‚   β”œβ”€β”€ jobs/                     # Core PySpark job modules
β”‚   β”‚   β”œβ”€β”€ main_collection_data.py      # Main ETL pipeline
β”‚   β”‚   β”œβ”€β”€ transform_collection_data.py # Data transformation logic
β”‚   β”‚   β”œβ”€β”€ run_main.py                  # EMR entry point script
β”‚   β”‚   β”œβ”€β”€ config/                      # Configuration files
β”‚   β”‚   β”‚   β”œβ”€β”€ datasets.json           # Dataset definitions
β”‚   β”‚   β”‚   └── transformed_source.json # Schema configurations
β”‚   β”‚   └── dbaccess/                    # Database connectivity modules
β”‚   β”œβ”€β”€ utils/                    # Utility modules
β”‚   β”‚   β”œβ”€β”€ aws_secrets_manager.py      # AWS Secrets Manager integration
β”‚   β”‚   └── path_utils.py               # Path resolution utilities
β”‚   β”œβ”€β”€ airflow/                  # Airflow DAGs and configuration
β”‚   β”‚   └── dags/                       # Airflow DAG definitions
β”‚   └── infra/                    # Infrastructure scripts
β”‚       └── emr/                        # EMR deployment scripts
β”œβ”€β”€ tests/                        # Comprehensive test suite
β”‚   β”œβ”€β”€ unit/                     # Unit tests (fast, isolated)
β”‚   β”œβ”€β”€ integration/              # Integration tests (databases, files)
β”‚   β”œβ”€β”€ acceptance/               # End-to-end workflow tests
β”‚   └── conftest.py              # Shared test configuration
β”œβ”€β”€ examples/                     # Usage examples
β”œβ”€β”€ requirements.txt              # Production dependencies
β”œβ”€β”€ requirements-test.txt         # Testing dependencies
β”œβ”€β”€ pytest.ini                   # Pytest configuration
β”œβ”€β”€ setup.py                     # Package configuration
└── README.md                    # This file

πŸš€ Quick Start

Prerequisites

  • Python 3.8+
  • Java 11+ (for PySpark)
  • Apache Spark 3.3+
  • AWS CLI configured (for deployment)

Installation

  1. Clone the repository:
git clone <repository-url>
cd pyspark-jobs
  1. Install dependencies:
# Production dependencies
pip install -r requirements.txt

# Development and testing dependencies
pip install -r requirements-test.txt
  1. Install the package in development mode:
pip install -e .

Running Locally

  1. Run a specific transformation:
python src/jobs/run_main.py \
  --load_type full \
  --data_set transport-access-node \
  --path s3://your-bucket/data/
  1. Execute the main ETL pipeline:
python src/jobs/main_collection_data.py

πŸ§ͺ Testing

This project includes a comprehensive test suite with three levels of testing:

Running Tests

# Run all tests
pytest

# Run specific test categories
pytest -m unit                    # Fast unit tests
pytest -m integration             # Integration tests
pytest -m acceptance              # End-to-end tests

# Run with coverage
pytest --cov=src --cov-report=html

# Run in parallel
pytest -n auto

Test Structure

  • Unit Tests (tests/unit/): Fast, isolated component tests
  • Integration Tests (tests/integration/): Database and external service tests
  • Acceptance Tests (tests/acceptance/): Complete workflow validation

For detailed testing information, see tests/README.md.

πŸ“Š Data Processing Workflows

Main ETL Pipeline

The core ETL pipeline (main_collection_data.py) processes data through these stages:

  1. Data Extraction: Load from S3 CSV files
  2. Data Transformation: Apply business logic transformations
  3. Data Loading: Output to partitioned Parquet files

Supported Datasets

Configure datasets in src/jobs/config/datasets.json:

{
  "transport-access-node": {
    "path": "s3://bucket/transport-access-node-collection/",
    "enabled": true
  },
  "title-boundaries": {
    "path": "s3://bucket/title-boundary-collection/", 
    "enabled": false
  }
}

Transformation Types

  • Fact Processing: Deduplicate and prioritize fact records
  • Fact Resource Processing: Extract resource relationships
  • Entity Processing: Pivot fields into structured entity records
  • Issue Processing: Track and validate data quality issues

πŸ”§ Configuration

Environment Variables

# AWS Configuration
export AWS_REGION=eu-west-2
export AWS_ACCESS_KEY_ID=your-access-key
export AWS_SECRET_ACCESS_KEY=your-secret-key

# Database Configuration (optional)
export POSTGRES_SECRET_NAME=your-secret-name
export USE_DATABASE=true

# Spark Configuration
export PYSPARK_PYTHON=python3
export SPARK_HOME=/path/to/spark

AWS Secrets Manager

Use AWS Secrets Manager for secure credential storage:

from utils.aws_secrets_manager import get_database_credentials

# Retrieve database credentials
db_creds = get_database_credentials("myapp/database/postgres")

See examples/secrets_usage_example.py for detailed usage.

🚁 Deployment

EMR Serverless Deployment

  1. Package the application:
python setup.py bdist_wheel
  1. Upload to S3:
aws s3 cp dist/pyspark_jobs-*.whl s3://your-bucket/packages/
aws s3 cp src/jobs/run_main.py s3://your-bucket/scripts/
  1. Submit EMR Serverless job:
aws emr-serverless start-job-run \
  --application-id your-app-id \
  --execution-role-arn your-role-arn \
  --job-driver '{
    "sparkSubmit": {
      "entryPoint": "s3://your-bucket/scripts/run_main.py",
      "sparkSubmitParameters": "--py-files s3://your-bucket/packages/pyspark_jobs-*.whl"
    }
  }'

Airflow Integration

Deploy DAGs to Amazon MWAA:

aws s3 sync src/airflow/dags/ s3://your-airflow-bucket/dags/

πŸ“ˆ Monitoring and Logging

Spark UI

Access Spark UI at http://localhost:4040 during local execution.

CloudWatch Logs

EMR Serverless jobs automatically log to CloudWatch under:

  • /aws/emr-serverless/applications/{application-id}/jobs/{job-run-id}

Application Logs

Structured logging with configurable levels:

import logging
logger = logging.getLogger(__name__)
logger.info("Processing started for dataset: %s", dataset_name)

πŸ” Data Quality

Schema Validation

  • Automatic schema inference and validation
  • Support for required and optional fields
  • Data type enforcement

Issue Tracking

  • Comprehensive data quality checks
  • Issue categorization and reporting
  • Integration with fact/entity processing

πŸ› οΈ Development

Code Structure

  • Jobs: Main processing logic in src/jobs/
  • Utils: Shared utilities in src/utils/
  • Configuration: JSON-based config in src/jobs/config/
  • Tests: Comprehensive test suite in tests/

Adding New Transformations

  1. Create transformation function in transform_collection_data.py
  2. Add schema configuration to config/ directory
  3. Write comprehensive tests in appropriate test directory
  4. Update dataset configuration if needed

Code Quality

# Run linters
black src/ tests/
flake8 src/ tests/
isort src/ tests/

# Type checking
mypy src/

πŸ“š Examples

See the examples/ directory for:

  • AWS Secrets Manager usage
  • Custom transformation examples
  • Configuration templates
  • Deployment scripts

🀝 Contributing

  1. Fork the repository
  2. Create a feature branch
  3. Make changes with tests
  4. Run the test suite
  5. Submit a pull request

Development Setup

# Install development dependencies
pip install -r requirements-test.txt

# Install pre-commit hooks
pre-commit install

# Run tests before committing
pytest

πŸ“„ License

This project is licensed under the MIT License - see the LICENSE file for details.

πŸ†˜ Support

For issues and questions:

  1. Check the tests/README.md for testing guidance
  2. Review examples in the examples/ directory
  3. Check documentation in the docs/ directory:
  4. Open an issue on GitHub

πŸ”„ CI/CD

The project includes configuration for:

  • Automated testing with pytest
  • Code quality checks
  • AWS deployment pipelines
  • Docker containerization support

GitHub Actions (if configured)

# Example workflow
name: CI/CD Pipeline
on: [push, pull_request]
jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      - uses: actions/setup-python@v4
      - run: pip install -r requirements-test.txt
      - run: pytest --cov=src

Built with ❀️ for Digital Land data processing pyspark-jobs repo for pyspark jobs. added code for issue table, fact-res, fact tables. main

About

repo for pyspark jobs

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Contributors 4

  •  
  •  
  •  
  •