Skip to content

Commit 46be947

Browse files
committed
refactor(pems_streamlit): use pems_data for S3 access
1 parent dd578a0 commit 46be947

File tree

2 files changed

+15
-57
lines changed

2 files changed

+15
-57
lines changed

pems_streamlit/pyproject.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,8 @@ description = "The Streamlit application for PeMS data visualizations."
44
dynamic = ["version"]
55
requires-python = ">=3.12"
66
dependencies = [
7-
"boto3==1.39.7",
87
"django==5.2.3",
9-
"pandas==2.3.0",
8+
"pems_data @ file:./pems_data",
109
"streamlit==1.45.1",
1110
]
1211

pems_streamlit/src/pems_streamlit/apps/stations/app_stations.py

Lines changed: 14 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,82 +1,41 @@
11
import re
2-
import boto3
2+
33
import pandas as pd
44
import streamlit as st
55

6-
S3_BUCKET = "caltrans-pems-prd-us-west-2-marts"
7-
STATIONS_METADATA_KEY = "geo/current_stations.parquet"
8-
DATA_PREFIX = "imputation/detector_imputed_agg_five_minutes"
6+
from pems_data.stations import StationsBucket
7+
8+
9+
BUCKET = StationsBucket()
910

1011

1112
@st.cache_data(ttl=3600) # Cache for 1 hour
1213
def load_station_metadata(district_number: str) -> pd.DataFrame:
1314
"""Loads metadata for all stations in the selected District from S3."""
14-
15-
filters = [("DISTRICT", "=", district_number)]
16-
17-
return pd.read_parquet(
18-
f"s3://{S3_BUCKET}/{STATIONS_METADATA_KEY}",
19-
columns=[
20-
"STATION_ID",
21-
"NAME",
22-
"PHYSICAL_LANES",
23-
"STATE_POSTMILE",
24-
"ABSOLUTE_POSTMILE",
25-
"LATITUDE",
26-
"LONGITUDE",
27-
"LENGTH",
28-
"STATION_TYPE",
29-
"DISTRICT",
30-
"FREEWAY",
31-
"DIRECTION",
32-
"COUNTY_NAME",
33-
"CITY_NAME",
34-
],
35-
filters=filters,
36-
)
15+
return BUCKET.get_district_metadata(district_number)
3716

3817

3918
@st.cache_data(ttl=3600) # Cache for 1 hour
4019
def get_available_days() -> set:
4120
"""
4221
Lists available days by inspecting S3 prefixes.
4322
"""
23+
# Find "day=", then capture one or more digits that immediately follow it
24+
pattern = re.compile(r"day=(\d+)")
4425

45-
s3 = boto3.client("s3")
46-
s3_keys = s3.list_objects(Bucket=S3_BUCKET, Prefix=DATA_PREFIX)
26+
# add as int only the text captured by the first set of parentheses to the set
27+
def match(m: re.Match):
28+
return int(m.group(1))
4729

48-
days = set()
49-
50-
for item in s3_keys["Contents"]:
51-
s3_path = item["Key"]
52-
# Find "day=", then capture one or more digits that immediately follow it
53-
match = re.search(r"day=(\d+)", s3_path)
54-
if match:
55-
# add as int only the text captured by the first set of parentheses to the set
56-
days.add(int(match.group(1)))
57-
58-
return sorted(days)
30+
return BUCKET.get_prefixes(pattern, initial_prefix=BUCKET.imputation_detector_agg_5min, match_func=match)
5931

6032

33+
@st.cache_data(ttl=3600) # Cache for 1 hour
6134
def load_station_data(station_id: str) -> pd.DataFrame:
6235
"""
6336
Loads station data for a specific station.
6437
"""
65-
66-
filters = [("STATION_ID", "=", station_id)]
67-
68-
return pd.read_parquet(
69-
f"s3://{S3_BUCKET}/{DATA_PREFIX}",
70-
columns=[
71-
"STATION_ID",
72-
"LANE",
73-
"SAMPLE_TIMESTAMP",
74-
"VOLUME_SUM",
75-
"SPEED_FIVE_MINS",
76-
"OCCUPANCY_AVG",
77-
],
78-
filters=filters,
79-
)
38+
return BUCKET.get_imputed_agg_5min(station_id)
8039

8140

8241
# --- STREAMLIT APP ---

0 commit comments

Comments
 (0)