33
33
API_RESPONSE_TIME = Gauge ('weather_api_response_time' , 'API response time in seconds' )
34
34
DATA_POINTS_EXTRACTED = Gauge ('weather_data_points_extracted' , 'Number of data points extracted' )
35
35
36
- class WeatherDataExtractor :
37
- """Class for extracting weather data from OpenWeatherMap API."""
38
-
39
- def __init__ (self , config_path : str ):
40
- """
41
- Initialize the extractor with configuration.
42
-
43
- Args:
44
- config_path: Path to the configuration file.
45
- """
46
- self .config = self ._load_config (config_path )
47
- self .api_url = self .config ['api' ]['url' ]
48
- self .api_key = os .getenv ('OPENWEATHERMAP_KEY' )
49
- if not self .api_key :
50
- self .api_key = self .config ['api' ]['key' ]
51
- self .cities = self .config ['data' ]['cities' ]
52
- self .timeout = self .config ['pipeline' ]['timeout' ]
53
- self .retry_attempts = self .config ['pipeline' ]['retry_attempts' ]
54
- self .raw_data_path = self .config ['data' ]['raw_data_path' ]
55
-
56
- # Create raw data directory if it doesn't exist
57
- os .makedirs (self .raw_data_path , exist_ok = True )
58
-
59
- def _load_config (self , config_path : str ) -> Dict :
60
- """Load configuration from YAML file."""
61
- with open (config_path , 'r' ) as file :
62
- return yaml .safe_load (file )
63
-
64
- def _make_api_request (self , city : Dict [str , str ]) -> Dict [str , Any ]:
65
- """
66
- Make a request to the OpenWeatherMap API.
67
-
68
- Args:
69
- city: Dictionary containing city name and country code.
70
-
71
- Returns:
72
- Dictionary containing the API response.
73
- """
74
- params = {
75
- 'q' : f"{ city ['name' ]} ,{ city ['country' ]} " ,
76
- 'appid' : self .api_key ,
77
- 'units' : 'metric' # Use metric units (Celsius)
78
- }
79
-
80
- for attempt in range (self .retry_attempts ):
81
- try :
82
- start_time = time .time ()
83
- response = requests .get (
84
- self .api_url ,
85
- params = params ,
86
- timeout = self .timeout
87
- )
88
- response_time = time .time () - start_time
89
- API_RESPONSE_TIME .set (response_time )
90
-
91
- response .raise_for_status () # Raise error for bad status codes
92
- EXTRACT_SUCCESSES .inc ()
93
- return response .json ()
94
- except requests .exceptions .RequestException as e :
95
- logger .warning (f"Attempt { attempt + 1 } failed for { city ['name' ]} : { str (e )} " )
96
- if attempt == self .retry_attempts - 1 :
97
- logger .error (f"Failed to fetch data for { city ['name' ]} after { self .retry_attempts } attempts" )
98
- EXTRACT_FAILURES .inc ()
99
- return {}
100
- time .sleep (2 ) # Wait before retrying
101
-
102
- def extract_current_weather (self ) -> List [Dict [str , Any ]]:
103
- """
104
- Extract current weather data for all configured cities.
105
-
106
- Returns:
107
- List of dictionaries containing weather data for each city.
108
- """
109
- weather_data = []
110
-
111
- for city in self .cities :
112
- logger .info (f"Extracting weather data for { city ['name' ]} , { city ['country' ]} " )
113
-
114
- data = self ._make_api_request (city )
115
- if data :
116
- # Add timestamp and source information
117
- data ['extraction_timestamp' ] = datetime .now ().isoformat ()
118
- data ['city_name' ] = city ['name' ]
119
- data ['country_code' ] = city ['country' ]
120
-
121
- weather_data .append (data )
122
-
123
- # Save raw data
124
- self ._save_raw_data (data , city )
125
-
126
- # Update metrics
127
- DATA_POINTS_EXTRACTED .set (len (weather_data ))
128
-
129
- return weather_data
130
-
131
- def _save_raw_data (self , data : Dict [str , Any ], city : Dict [str , str ]) -> None :
132
- """
133
- Save raw data to JSON file.
134
-
135
- Args:
136
- data: Weather data to save.
137
- city: City information.
138
- """
139
- timestamp = datetime .now ().strftime ("%Y%m%d_%H%M%S" )
140
- filename = f"{ city ['name' ]} _{ city ['country' ]} _{ timestamp } .json"
141
- filepath = os .path .join (self .raw_data_path , filename )
142
-
143
- with open (filepath , 'w' ) as file :
144
- json .dump (data , file , indent = 2 )
145
-
146
- logger .info (f"Saved raw data to { filepath } " )
147
-
148
- if __name__ == "__main__" :
149
- # Configure logging
150
- logging .basicConfig (
151
- level = logging .INFO ,
152
- format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
153
- )
154
-
155
- # Test the extractor
156
- extractor = WeatherDataExtractor ("./config/config.yaml" )
157
- data = extractor .extract_current_weather ()
158
- print (f"Extracted weather data for { len (data )} cities" )
159
- EOLcat > src / extract .py << 'EOL'
160
- """
161
- Weather data extraction module.
162
-
163
- Workflow:
164
- 1. Initialize with configuration from YAML file
165
- 2. Connect to OpenWeatherMap API with proper authentication
166
- 3. Fetch current weather data for configured cities
167
- 4. Add metadata including extraction timestamp
168
- 5. Save raw data to JSON files in the raw data directory
169
- 6. Return the extracted data for optional further processing
170
- 7. Handle API errors with retries and proper logging
171
- """
172
- import os
173
- import json
174
- import logging
175
- import requests
176
- from datetime import datetime
177
- import pandas as pd
178
- from typing import Dict , List , Any
179
- import yaml
180
- import time
181
- from dotenv import load_dotenv
182
- from prometheus_client import Counter , Gauge
183
-
184
- # Load environment variables
185
- load_dotenv ()
186
-
187
- logger = logging .getLogger (__name__ )
188
-
189
- # Prometheus metrics
190
- EXTRACT_SUCCESSES = Counter ('weather_extract_successes' , 'Number of successful API extractions' )
191
- EXTRACT_FAILURES = Counter ('weather_extract_failures' , 'Number of failed API extractions' )
192
- API_RESPONSE_TIME = Gauge ('weather_api_response_time' , 'API response time in seconds' )
193
- DATA_POINTS_EXTRACTED = Gauge ('weather_data_points_extracted' , 'Number of data points extracted' )
194
-
195
36
class WeatherDataExtractor :
196
37
"""Class for extracting weather data from OpenWeatherMap API."""
197
38
@@ -314,4 +155,4 @@ def _save_raw_data(self, data: Dict[str, Any], city: Dict[str, str]) -> None:
314
155
# Test the extractor
315
156
extractor = WeatherDataExtractor ("./config/config.yaml" )
316
157
data = extractor .extract_current_weather ()
317
- print (f"Extracted weather data for { len (data )} cities" )
158
+ print (f"Extracted weather data for { len (data )} cities" )
0 commit comments