|
1 | 1 | import re
|
2 |
| -import boto3 |
| 2 | + |
3 | 3 | import pandas as pd
|
4 | 4 | import streamlit as st
|
5 | 5 |
|
6 |
| -S3_BUCKET = "caltrans-pems-prd-us-west-2-marts" |
7 |
| -STATIONS_METADATA_KEY = "geo/current_stations.parquet" |
8 |
| -DATA_PREFIX = "imputation/detector_imputed_agg_five_minutes" |
| 6 | +from pems_data.stations import StationsBucket |
| 7 | + |
| 8 | + |
| 9 | +BUCKET = StationsBucket() |
9 | 10 |
|
10 | 11 |
|
11 | 12 | @st.cache_data(ttl=3600) # Cache for 1 hour
|
12 | 13 | def load_station_metadata(district_number: str) -> pd.DataFrame:
|
13 | 14 | """Loads metadata for all stations in the selected District from S3."""
|
14 |
| - |
15 |
| - filters = [("DISTRICT", "=", district_number)] |
16 |
| - |
17 |
| - return pd.read_parquet( |
18 |
| - f"s3://{S3_BUCKET}/{STATIONS_METADATA_KEY}", |
19 |
| - columns=[ |
20 |
| - "STATION_ID", |
21 |
| - "NAME", |
22 |
| - "PHYSICAL_LANES", |
23 |
| - "STATE_POSTMILE", |
24 |
| - "ABSOLUTE_POSTMILE", |
25 |
| - "LATITUDE", |
26 |
| - "LONGITUDE", |
27 |
| - "LENGTH", |
28 |
| - "STATION_TYPE", |
29 |
| - "DISTRICT", |
30 |
| - "FREEWAY", |
31 |
| - "DIRECTION", |
32 |
| - "COUNTY_NAME", |
33 |
| - "CITY_NAME", |
34 |
| - ], |
35 |
| - filters=filters, |
36 |
| - ) |
| 15 | + return BUCKET.get_district_metadata(district_number) |
37 | 16 |
|
38 | 17 |
|
39 | 18 | @st.cache_data(ttl=3600) # Cache for 1 hour
|
40 | 19 | def get_available_days() -> set:
|
41 | 20 | """
|
42 | 21 | Lists available days by inspecting S3 prefixes.
|
43 | 22 | """
|
| 23 | + # Find "day=", then capture one or more digits that immediately follow it |
| 24 | + pattern = re.compile(r"day=(\d+)") |
44 | 25 |
|
45 |
| - s3 = boto3.client("s3") |
46 |
| - s3_keys = s3.list_objects(Bucket=S3_BUCKET, Prefix=DATA_PREFIX) |
| 26 | + # add as int only the text captured by the first set of parentheses to the set |
| 27 | + def match(m: re.Match): |
| 28 | + return int(m.group(1)) |
47 | 29 |
|
48 |
| - days = set() |
49 |
| - |
50 |
| - for item in s3_keys["Contents"]: |
51 |
| - s3_path = item["Key"] |
52 |
| - # Find "day=", then capture one or more digits that immediately follow it |
53 |
| - match = re.search(r"day=(\d+)", s3_path) |
54 |
| - if match: |
55 |
| - # add as int only the text captured by the first set of parentheses to the set |
56 |
| - days.add(int(match.group(1))) |
57 |
| - |
58 |
| - return sorted(days) |
| 30 | + return BUCKET.get_prefixes(pattern, initial_prefix=BUCKET.imputation_detector_agg_5min, match_func=match) |
59 | 31 |
|
60 | 32 |
|
| 33 | +@st.cache_data(ttl=3600) # Cache for 1 hour |
61 | 34 | def load_station_data(station_id: str) -> pd.DataFrame:
|
62 | 35 | """
|
63 | 36 | Loads station data for a specific station.
|
64 | 37 | """
|
65 |
| - |
66 |
| - filters = [("STATION_ID", "=", station_id)] |
67 |
| - |
68 |
| - return pd.read_parquet( |
69 |
| - f"s3://{S3_BUCKET}/{DATA_PREFIX}", |
70 |
| - columns=[ |
71 |
| - "STATION_ID", |
72 |
| - "LANE", |
73 |
| - "SAMPLE_TIMESTAMP", |
74 |
| - "VOLUME_SUM", |
75 |
| - "SPEED_FIVE_MINS", |
76 |
| - "OCCUPANCY_AVG", |
77 |
| - ], |
78 |
| - filters=filters, |
79 |
| - ) |
| 38 | + return BUCKET.get_imputed_agg_5min(station_id) |
80 | 39 |
|
81 | 40 |
|
82 | 41 | # --- STREAMLIT APP ---
|
|
0 commit comments