|
| 1 | +## CDC pipeline for NOAA CO2 Data in Snowflake using AWS Lambda |
| 2 | +A comprehensive data pipeline for processing, analyzing, and visualizing CO2 measurement data using Snowflake's modern data stack. |
1 | 3 |
|
| 4 | +## Project Overview |
2 | 5 |
|
3 |
| - |
| 6 | +This project implements an incremental data pipeline that processes CO2 concentration measurements from the Mauna Loa Observatory. The pipeline extracts data from public sources, loads it into Snowflake, and builds harmonized and analytical data layers to enable analysis of CO2 trends over time. |
4 | 7 |
|
| 8 | +## Architecture |
5 | 9 |
|
| 10 | +The pipeline follows a multi-layer data architecture: |
6 | 11 |
|
7 |
| -## Prerequisites |
| 12 | +1. **Raw Layer** - Contains raw CO2 data loaded directly from source files |
| 13 | +2. **Harmonized Layer** - Standardized data with consistent formatting and data quality checks |
| 14 | +3. **Analytics Layer** - Derived tables with aggregations, metrics, and enriched attributes for analysis |
8 | 15 |
|
9 |
| -- Snowflake account with ACCOUNTADMIN privileges |
10 |
| -- GitHub account |
11 |
| -- Python 3.10 or later |
12 |
| -- Snowflake CLI (snow) installed |
| 16 | +### Key Components: |
13 | 17 |
|
14 |
| -## Step-by-Step Setup and Execution |
| 18 | +- **Raw Data Ingestion** - Loads CO2 data from S3 into the raw layer |
| 19 | +- **Change Data Capture** - Uses Snowflake streams to track changes in the raw data |
| 20 | +- **Harmonization** - Transforms raw data into a consistent format |
| 21 | +- **Analytics Processing** - Calculates trends, aggregations, and derived metrics |
| 22 | +- **UDFs** - Custom functions for CO2 calculations (volatility, daily/weekly changes) |
15 | 23 |
|
16 |
| -### 1. Clone the Repository |
| 24 | +## Technologies |
17 | 25 |
|
18 |
| -```bash |
19 |
| -git clone https://github.com/[your-username]/snowflake-cicd-lab.git |
20 |
| -cd snowflake-cicd-lab |
21 |
| -``` |
| 26 | +- **Snowflake** - Cloud data warehouse |
| 27 | +- **Python** - Primary programming language |
| 28 | +- **Snowpark** - Snowflake's Python API for data processing |
| 29 | +- **GitHub Actions** - CI/CD pipeline |
| 30 | +- **AWS S3** - Data storage for source files |
| 31 | +- **pytest** - Testing framework |
22 | 32 |
|
23 |
| -### 2. Set Up Snowflake Environment |
| 33 | +## Setup and Installation |
24 | 34 |
|
25 |
| -1. Log in to your Snowflake account using ACCOUNTADMIN role |
26 |
| -2. Run the setup script to create necessary Snowflake objects: |
| 35 | +### Prerequisites |
27 | 36 |
|
28 |
| -```bash |
29 |
| -snowsql -f scripts/setup_snowflake.sql |
30 |
| -``` |
| 37 | +- Python 3.10 or later |
| 38 | +- Snowflake account |
| 39 | +- AWS account with access to S3 buckets |
| 40 | +- RSA key pair for Snowflake authentication |
31 | 41 |
|
32 |
| -Or execute the script through the Snowflake web UI. |
| 42 | +### Local Environment Setup |
33 | 43 |
|
34 |
| -3. Load the sample data: |
| 44 | +1. Clone the repository: |
35 | 45 |
|
36 | 46 | ```bash
|
37 |
| -snowsql -f scripts/load_sample_data.sql |
| 47 | +git clone <repository-url> |
| 48 | +cd Incremental_DataPipleine_using_Snowflake |
38 | 49 | ```
|
39 | 50 |
|
40 |
| -### 3. Configure Your Local Environment |
41 |
| - |
42 |
| -1. Install required Python packages: |
| 51 | +2. Create and activate a virtual environment: |
43 | 52 |
|
44 | 53 | ```bash
|
45 |
| -pip install -r src/data_masker/requirements.txt |
46 |
| -pip install pytest snowflake-cli-labs pyyaml |
| 54 | +python -m venv venv |
| 55 | +source venv/bin/activate # On Windows: venv\Scripts\activate |
47 | 56 | ```
|
48 | 57 |
|
49 |
| -2. Set Snowflake connection environment variables: |
| 58 | +3. Install dependencies: |
50 | 59 |
|
51 | 60 | ```bash
|
52 |
| -export SNOWFLAKE_ACCOUNT="your-account-identifier" |
53 |
| -export SNOWFLAKE_USER="your-username" |
54 |
| -export SNOWFLAKE_PASSWORD="your-password" |
55 |
| -export SNOWFLAKE_ROLE="CICD_LAB_ROLE" |
56 |
| -export SNOWFLAKE_WAREHOUSE="CICD_LAB_WH" |
| 61 | +pip install -r requirements.txt |
57 | 62 | ```
|
58 | 63 |
|
59 |
| -### 4. Run Unit Tests Locally |
| 64 | +4. Set up RSA key pair authentication: |
60 | 65 |
|
61 | 66 | ```bash
|
62 |
| -pytest tests/test_data_masker.py -v |
| 67 | +mkdir -p ~/.snowflake/keys |
| 68 | +python scripts/rsa_key_pair_authentication/generate_snowflake_keys.py |
63 | 69 | ```
|
64 | 70 |
|
65 |
| -### 5. Deploy Manually to Development Environment |
66 |
| - |
67 |
| -```bash |
68 |
| -python deploy_snowpark_app.py --env dev |
| 71 | +5. Configure Snowflake connection by creating `~/.snowflake/connections.toml`: |
| 72 | + |
| 73 | +```toml |
| 74 | +[dev] |
| 75 | +account = "your-account" |
| 76 | +user = "your-username" |
| 77 | +private_key_path = "~/.snowflake/keys/rsa_key.p8" |
| 78 | +warehouse = "CO2_WH_DEV" |
| 79 | +role = "CO2_ROLE_DEV" |
| 80 | +database = "CO2_DB_DEV" |
| 81 | +schema = "RAW_CO2" |
| 82 | +client_request_mfa_token = false |
| 83 | + |
| 84 | +[prod] |
| 85 | +account = "your-account" |
| 86 | +user = "your-username" |
| 87 | +private_key_path = "~/.snowflake/keys/rsa_key.p8" |
| 88 | +warehouse = "CO2_WH_PROD" |
| 89 | +role = "CO2_ROLE_PROD" |
| 90 | +database = "CO2_DB_PROD" |
| 91 | +schema = "RAW_CO2" |
| 92 | +client_request_mfa_token = false |
69 | 93 | ```
|
70 | 94 |
|
71 |
| -### 6. Test the UDF in Snowflake |
| 95 | +6. Create a `.env` file with the following variables: |
72 | 96 |
|
73 |
| -Execute the test script in Snowflake: |
74 |
| - |
75 |
| -```bash |
76 |
| -snowsql -f scripts/test_udf.sql |
| 97 | +``` |
| 98 | +AWS_ACCESS_KEY=<your-access-key> |
| 99 | +AWS_SECRET_KEY=<your-secret-key> |
| 100 | +AWS_REGION=<your-region> |
| 101 | +S3_BUCKET_NAME=noa-co2-datapipeline |
| 102 | +PARENT_FOLDER=noaa-co2-data |
| 103 | +SNOWFLAKE_ENV=dev |
77 | 104 | ```
|
78 | 105 |
|
79 |
| -Or through the Snowflake web UI. |
80 |
| - |
81 |
| -### 7. Configure GitHub Actions for CI/CD |
82 |
| - |
83 |
| -1. Fork this repository if you haven't already |
84 |
| -2. In your GitHub repository, go to Settings > Secrets and Variables > Actions |
85 |
| -3. Add the following repository secrets: |
86 |
| - - `SNOWFLAKE_ACCOUNT`: Your Snowflake account identifier |
87 |
| - - `SNOWFLAKE_USER`: Your Snowflake username |
88 |
| - - `SNOWFLAKE_PASSWORD`: Your Snowflake password |
89 |
| - - `SNOWFLAKE_ROLE`: CICD_LAB_ROLE |
90 |
| - - `SNOWFLAKE_WAREHOUSE`: CICD_LAB_WH |
91 |
| - |
92 |
| -### 8. Experience the CI/CD Pipeline |
93 |
| - |
94 |
| -The CI/CD pipeline is triggered automatically when you push to the repository: |
95 |
| -- Pushing to the `dev` branch deploys to the development environment |
96 |
| -- Pushing to the `main` branch deploys to the production environment |
97 |
| - |
98 |
| - |
99 |
| -### Step-by-Step Workflow |
100 |
| -🟢 Step 1: Data Ingestion (Raw Data Collection) |
101 |
| -✅ Fetch CO₂ data from NOAA: |
102 |
| - |
103 |
| -Download daily CO₂ levels from NOAA’s Mauna Loa Observatory. |
104 |
| - |
105 |
| -Clean, validate, and parse the data. |
106 |
| - |
107 |
| -✅ Store data in AWS S3: |
108 |
| - |
109 |
| -Organize raw data into folders by year (s3://co2-bucket/YYYY/co2_daily.csv). |
110 |
| - |
111 |
| -Upload the cleaned data using Boto3 (AWS SDK for Python). |
112 |
| - |
113 |
| -✅ Load data into Snowflake: |
114 |
| - |
115 |
| -Use COPY INTO to ingest data into RAW_CO2.CO2_DATA. |
116 |
| - |
117 |
| -Enable incremental tracking via Snowflake Streams (CO2_DATA_STREAM). |
118 |
| - |
119 |
| -🔵 Step 2: Data Harmonization (Transforming & Normalizing) |
120 |
| -✅ Create a harmonized table: |
121 |
| - |
122 |
| -Define HARMONIZED_CO2.harmonized_co2 schema. |
123 |
| - |
124 |
| -Convert ppm values to metric tons. |
125 |
| - |
126 |
| -✅ Merge new data using Snowflake Streams & Tasks: |
127 |
| - |
128 |
| -Use CO2_HARMONIZED_TASK for merging new records. |
129 |
| - |
130 |
| -Ensure incremental updates via SYSTEM$STREAM_HAS_DATA. |
131 |
| - |
132 |
| -✅ Store processed data: |
133 |
| - |
134 |
| -Maintain a structured and cleaned dataset in HARMONIZED_CO2. |
135 |
| - |
136 |
| -🟠 Step 3: Analytics & Insights |
137 |
| -✅ Compute key CO₂ metrics using UDFs: |
| 106 | +7. Create a `templates/environment.json` file: |
138 | 107 |
|
139 |
| -Daily Percent Change (co2_percent_change_udf.py) |
| 108 | +```json |
| 109 | +{ |
| 110 | + "environment": "dev" |
| 111 | +} |
| 112 | +``` |
140 | 113 |
|
141 |
| -Volatility Analysis (co2_volatility_udf.py) |
| 114 | +### Snowflake Setup |
142 | 115 |
|
143 |
| -Trend Forecasting (ML-based models) |
| 116 | +1. Register the public key with your Snowflake user: |
144 | 117 |
|
145 |
| -✅ Generate analytics tables: |
| 118 | +```sql |
| 119 | +ALTER USER YourUsername SET RSA_PUBLIC_KEY='<public-key-string>'; |
| 120 | +``` |
146 | 121 |
|
147 |
| -Store insights in ANALYTICS_CO2.DAILY_CO2_METRICS. |
| 122 | +2. Create required Snowflake resources: |
148 | 123 |
|
149 |
| -Apply unit conversion functions (ppm → metric tons). |
| 124 | +```bash |
| 125 | +python scripts/deployment_files/snowflake_deployer.py sql --profile dev --file scripts/setup_dev.sql |
| 126 | +``` |
150 | 127 |
|
151 |
| -✅ Enable real-time dashboards & API access: |
| 128 | +## Project Structure |
152 | 129 |
|
153 |
| -Provide Snowflake APIs for reporting. |
| 130 | +``` |
| 131 | +Incremental_DataPipleine_using_Snowflake/ |
| 132 | +├── .github/workflows/ # CI/CD workflows |
| 133 | +├── scripts/ # Utility scripts |
| 134 | +│ ├── deployment_files/ # Deployment automation |
| 135 | +│ ├── raw data loading/ # Data ingestion scripts |
| 136 | +│ └── rsa_key_pair_authentication/ # Authentication utilities |
| 137 | +├── templates/ # Configuration templates |
| 138 | +├── tests/ # Test suite |
| 139 | +├── udfs_and_spoc/ # User-Defined Functions and Stored Procedures |
| 140 | +│ ├── co2_analytical_sp/ # Analytics stored procedure |
| 141 | +│ ├── co2_harmonized_sp/ # Harmonization stored procedure |
| 142 | +│ ├── daily_co2_changes/ # UDF for daily CO2 changes |
| 143 | +│ ├── loading_co2_data_sp/ # Data loading stored procedure |
| 144 | +│ ├── python_udf/ # Custom Python UDFs |
| 145 | +│ └── weekly_co2_changes/ # UDF for weekly CO2 changes |
| 146 | +├── .env # Environment variables (not in repo) |
| 147 | +├── deployment_and_key_workflow.md # Deployment documentation |
| 148 | +├── pyproject.toml # Project metadata and dependencies |
| 149 | +├── pytest.ini # pytest configuration |
| 150 | +├── README.md # This file |
| 151 | +├── requirements.txt # Python dependencies |
| 152 | +└── rsa_key_pair_generator.md # RSA key setup instructions |
| 153 | +``` |
154 | 154 |
|
155 |
| -Generate CO₂ monitoring dashboards. |
| 155 | +## Usage |
156 | 156 |
|
157 |
| -🟣 Step 4: Automation & Deployment |
158 |
| -✅ Automate Pipeline Execution in Snowflake: |
| 157 | +### Loading Raw Data |
159 | 158 |
|
160 |
| -Schedule Daily Task Execution (2 AM UTC). |
| 159 | +```bash |
| 160 | +python scripts/raw\ data\ loading\ and\ stream\ creation/raw_co2_data.py |
| 161 | +``` |
161 | 162 |
|
162 |
| -Run CO2_HARMONIZED_TASK to update harmonized data. |
| 163 | +### Creating Streams for Change Data Capture |
163 | 164 |
|
164 |
| -Run CO2_ANALYTICS_TASK to refresh analytics tables. |
| 165 | +```bash |
| 166 | +python scripts/raw\ data\ loading\ and\ stream\ creation/02_create_rawco2data_stream.py |
| 167 | +``` |
165 | 168 |
|
166 |
| -✅ Enable CI/CD pipeline using GitHub Actions: |
| 169 | +### Running Tests |
167 | 170 |
|
168 |
| -Automate deployment of Snowpark-based AI models. |
| 171 | +```bash |
| 172 | +pytest tests/ |
| 173 | +``` |
169 | 174 |
|
170 |
| -Push updates to forecasting & anomaly detection models. |
| 175 | +### Deploying Components to Snowflake |
171 | 176 |
|
172 |
| -📌 Diagram Representation |
173 |
| -The CO₂ Emissions Data Pipeline is visually represented in the following diagram: |
| 177 | +Deploy all components: |
| 178 | +```bash |
| 179 | +python scripts/deployment_files/snowflake_deployer.py deploy-all --profile dev --path udfs_and_spoc --check-changes |
| 180 | +``` |
174 | 181 |
|
175 |
| -(Refer to co2_pipeline_simplified.png for the architecture overview.) |
| 182 | +Deploy a specific component: |
| 183 | +```bash |
| 184 | +python scripts/deployment_files/snowflake_deployer.py deploy --profile dev --path udfs_and_spoc/co2_harmonized_sp --name HARMONIZE_CO2_DATA --type procedure |
| 185 | +``` |
176 | 186 |
|
177 |
| -🚀 How to Run the Pipeline |
178 |
| -Set up AWS credentials in .env or use IAM roles. |
| 187 | +## Data Flow |
179 | 188 |
|
180 |
| -Configure Snowflake credentials for secure access. |
| 189 | +1. **Data Ingestion**: CO2 data is loaded from S3 into the RAW_CO2 schema |
| 190 | +2. **Change Detection**: Snowflake streams track changes in raw data |
| 191 | +3. **Harmonization**: The HARMONIZE_CO2_DATA stored procedure transforms raw data into the harmonized layer |
| 192 | +4. **Analytics**: The ANALYZE_CO2_DATA stored procedure creates analytical tables and views |
| 193 | +5. **User Access**: End users query the analytics layer for insights |
181 | 194 |
|
182 |
| -Run the ingestion script to fetch & store NOAA data. |
| 195 | +## Authentication |
183 | 196 |
|
184 |
| -Trigger Snowflake tasks to harmonize & process data. |
185 |
| -Automate execution using GitHub Actions for daily processing. |
| 197 | +The project uses RSA key pair authentication for secure, MFA-free deployments to Snowflake: |
186 | 198 |
|
| 199 | +1. Generate RSA key pair using the provided scripts |
| 200 | +2. Register the public key with your Snowflake user |
| 201 | +3. Configure the connection profile to use the private key |
| 202 | +4. Store the private key securely in GitHub secrets for CI/CD |
187 | 203 |
|
188 |
| -🛠️ Technologies Used |
189 |
| -AWS S3 (Storage) |
| 204 | +For detailed instructions, see `rsa_key_pair_generator.md` and `deployment_and_key_workflow.md`. |
190 | 205 |
|
191 |
| -Snowflake (Data Warehousing) |
| 206 | +## CI/CD Pipeline |
192 | 207 |
|
193 |
| -Python & Boto3 (Data Processing) |
| 208 | +The GitHub Actions workflow automates: |
194 | 209 |
|
195 |
| -Snowpark (Machine Learning Models) |
| 210 | +1. Testing of all components |
| 211 | +2. Deployment to dev/prod environments based on branch |
| 212 | +3. Key-based authentication to Snowflake |
| 213 | +4. Validation of code quality and functionality |
196 | 214 |
|
197 |
| -GitHub Actions (CI/CD Automation) |
| 215 | +## Troubleshooting |
198 | 216 |
|
199 |
| -Streamlit / Power BI (Dashboards) |
| 217 | +Common issues and solutions: |
200 | 218 |
|
201 |
| -📧 Contact & Support |
202 |
| -For any queries, f |
| 219 | +- **Authentication Errors**: Verify key permissions and format |
| 220 | +- **Deployment Failures**: Check function signatures and parameter counts |
| 221 | +- **Connection Issues**: Run `python scripts/deployment_files/check_connections_file.py` to validate your connections.toml |
203 | 222 |
|
204 |
| -## Troubleshooting |
| 223 | +## Contributing |
205 | 224 |
|
206 |
| -- **Authentication Issues**: Verify your Snowflake credentials are correctly set |
207 |
| -- **Permission Errors**: Ensure the CICD_LAB_ROLE has all required privileges |
208 |
| -- **Deployment Failures**: Check the GitHub Actions logs for detailed errors |
209 |
| -- **Testing Errors**: Confirm your Python environment has all dependencies installed |
| 225 | +1. Fork the repository |
| 226 | +2. Create a feature branch |
| 227 | +3. Make your changes |
| 228 | +4. Run tests to ensure functionality |
| 229 | +5. Submit a pull request |
210 | 230 |
|
211 |
| -## Resources |
| 231 | +## License |
212 | 232 |
|
213 |
| -- [Snowflake Documentation](https://docs.snowflake.com/) |
214 |
| -- [Snowpark Python Developer Guide](https://docs.snowflake.com/en/developer-guide/snowpark/python/index.html) |
215 |
| -- [Snowflake CLI Documentation](https://docs.snowflake.com/en/developer-guide/snowflake-cli/index.html) |
216 |
| -- [GitHub Actions Documentation](https://docs.github.com/en/actions) |
| 233 | +[Specify your license here] |
217 | 234 |
|
218 |
| -## License |
| 235 | +## Acknowledgments |
219 | 236 |
|
220 |
| -This project is licensed under the MIT License - see the LICENSE file for details. |
| 237 | +- NOAA for providing the CO2 measurement data |
| 238 | +- Snowflake for the data warehousing platform |
0 commit comments