Skip to content

ellweaver/ainsdale-beach-etl

Repository files navigation

Ainsdale-Beach-ETL

Python Version Code Coverage MIT License

Summary

The Ainsdale Beach ETL project is an Extract Transform Load pipeline, using Python, hosted in AWS. The primary function of the pipeline is to ingest and transform data from an OLTP optimised database into an OLAP optimised data warehouse. The main objectives of this project are to be able to handle regular data updates and execute performant TDD code (Test Driven Development), providing an effective, usable star schema output with robust versioning. An additional feature of this project is to have an S3 bucket containing processed data in Parquet, that mirrors our processed data to permit further analysis better suited to that format. Our aim with this project is to enable analysis of important business data without compromising performance for day-to-day transactional operations.

Prerequisites

Before you begin, ensure you have met the following requirements:

  • Python V3.13

  • Terraform V1.5

  • Git for version control

  • AWS CLI (optional but recommended)

Development setup

  1. Clone the repository:
    git clone https://github.com/your-org/ainsdale-beach-etl.git
    cd ainsdale-beach-etl
  1. Set up Python TDD Environment
   python -m venv venv
   source venv/bin/activate
   pip install -r requirements.txt
  1. Set up AWS credentials If you have save your AWS credentials in local:
    aws configure

or set the environment variables:

    export AWS_ACCESS_KEY_ID=<Your-Access-Key-ID>
    export AWS_SECRET_ACCESS_KEY=<Your-Secret-Access-Key>
    export AWS_DEFAULT_REGION=<region>
  1. Run the set up and testing
    make run-all
  1. Deployment
    cd terraform
    terraform init
    terraform plan
    terraform apply

Extract

Our initial data is stored in an RDS database, and contains 11 tables of normalised OLTP data. Our extract function ingests and converts each table into .csv format using the polars library. Using boto3, each .csv file is uploaded to an S3 bucket with a key containing the ingestion timestamp and the original name of the table. This key convention enables simple versioning and reduces the chance of mishandling data.

Our key produces a pseudo file system based on year, month, day and a timestamp. The timestamp is then used as a batch id and each individual file is labelled with this id:

upload_file(
    s3_client,
    file=out_buffer.getvalue(),
    bucket_name=bucket,
    key=f"data/{current_year}/{current_month}/{current_day}/{time_now}/{time_now}_{table}.csv",
    )

Transform

The purpose of our transform function is to reshape our ingested S3 data towards an OLAP optimised format. Polars performantly reshapes our 11 initial .csv files, into 7 dataframes that form the basis of a star schema. During the transformation process, relevant dataframes are held in dictionaries. This facilitates easy looping and improves readability. Pyarrow converts the dataframes to .parquet:

for table, value in processed_dict.items():
    out_buffer = BytesIO()
    value.write_parquet(out_buffer)

The transformed files are uploaded to the destination bucket using boto3:

upload_file(
    s3_client,
    file=out_buffer.getvalue(),
    bucket_name=destination_bucket,
    key=f"{key}{batch_id}_{table}.parquet",
    )

Load

Once our data has been transformed and uploaded to S3, the load function uses that data to maintain the data warehouse. The load function pulls an existing copy of the data warehouse into memory to compare it to data uploaded in S3 from the previous transform function. Any new data found in S3 is saved as tail, which is joined to the appropriate table in the data warehouse:

    transformed_df = pl.read_parquet(file["body"])               # S3 data
    db_df = pl.read_database_uri("SELECT * FROM " + table, conn) # data from warehouse  
    db_df_shape = db_df.shape[0]                                 # row count of warehouse table
    transformed_shape = transformed_df.shape[0]                  # row count of table in S3
    tail = transformed_shape - db_df_shape                       # the difference between S3 and the warehouse

Data Visualisation

With our data warehouse now populated, we connected Google Looker to perform preliminary data visualisation. We created a data set based on fact_sales_order, with adjoining dim tables, to generate a visualisation of typical key queries. Some of those queries included:

  • total units sold since records began
  • units sold on a monthly basis, per year
  • distribution of currencies used by customers
  • units sold of each design
  • deliveries per country

Hosting

Our infrastructure has been provisioned in AWS, employing the following services:

  • RDS for Data warehouse
  • IAM roles for permissions and policies
  • Step Functions for orchestration
  • Lambda for functions
  • Secrets Manager for AWS secret credentials
  • CloudWatch for logging & alarms
  • S3 for outputs
  • Amazon EventBridge Scheduler for manage the events
  • SNS for alerts

Orchestration

Our project employs GitHub actions to adhere to CICD principles (Continuous Integration Continuous Development). GitHub Actions enables robust, automated testing and deployment. Using our make file tests for: functionality (pytest), formatting (black), coverage (coverage), and security (bandit) can all be actioned before any resources are deployed or changes made to the production code. If those tests are all successful the infrastructure is automatically deployed by terraform into the AWS cloud. This helps maintain the functionality and consistency of the code in a continuous development process.

Terraform Structure

Terraform deploys our infrastructure in AWS. This allows for scalable and granular control of infrastucture and easy co-operation between developers on the project. Terraform is also the basis of our CICD infrastructure. The file structure for our terraform directory is relatively conventional:

terraform/
├── cloudwatch.tf        # Logging & alarms
├── data.tf              # IAM identity data
├── events.tf            # Step function rules
├── iam.tf               # Roles & policies & permissions
├── lambda.tf            # Lambda functions and layers
├── s3.tf                # Buckets
├── sns.tf               # Topics & subscriptions
├── stepfunctions.tf     # State machine for step function
├── vars.tf              # Variables
└── main.tf              # Backend & provider

The terraform.tfstate file is held in S3 to better enable remote coworking and to enable easy switching of the backend terraform.tfstate file.

Resources provisioned outside Terraform:

  • backend bucket:
    • tf state
  • Lambda buckets:
    • python
    • layers
  • Amazon Elastic container Registry

Logging

Cloudwatch retains our project's logs and, error messages are received using SNS.

Requirements

Python version 3.13 Major packages include:

  • Polars
  • Pyarrow
  • Boto3
  • Black
  • Coverage
  • Pytest
  • Bandit
  • adbc
  • pg8000
  • sqlalchemy a full list of packages and dependencies can be found requirements.txt

Contribution

To contribute to ainsdale-beach-etl, follow these steps:

  1. Fork this repository.
  2. Create a branch: git checkout -b <branch_name>.
  3. Make your changes and commit them: git commit -m '<commit_message>'
  4. Push to the original branch: git push origin <project_name>/
  5. Create the pull request.

Contributors

Thanks to the following people who have contributed to this project:

License

This project uses the following license: MIT.

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Contributors 6