Skip to content

Split vp preprocessing out #1538

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion _shared_utils/shared_utils/rt_dates.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
"feb2025": "2025-02-12",
"mar2025": "2025-03-12",
"apr2025": "2025-04-16",
"may2025": "2025-05-14",
"jun2025": "2025-06-11",
}

years_available = list(range(2023, datetime.datetime.now().year + 1))
Expand Down
1 change: 1 addition & 0 deletions gtfs_funnel/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ preprocess_schedule_vp_dependency:

preprocess_vp:
python vp_keep_usable.py
python vp_direction.py
python cleanup.py
python vp_dwell_time.py
python vp_condenser.py
Expand Down
6 changes: 5 additions & 1 deletion gtfs_funnel/download_stop_times.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
from shared_utils import gtfs_utils_v2
from update_vars import GTFS_DATA_DICT, COMPILED_CACHED_VIEWS

import google.auth
credentials, _ = google.auth.default()

def download_one_day(analysis_date: str):
"""
Download single day for stop_times.
Expand All @@ -22,7 +25,8 @@ def download_one_day(analysis_date: str):
start = dt.datetime.now()

full_trips = pd.read_parquet(
f"{COMPILED_CACHED_VIEWS}trips_{analysis_date}.parquet")
f"{COMPILED_CACHED_VIEWS}trips_{analysis_date}.parquet",
)

FEEDS_TO_RUN = full_trips.feed_key.unique().tolist()

Expand Down
3 changes: 3 additions & 0 deletions gtfs_funnel/logs/vp_preprocessing.log
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,6 @@
2025-05-19 16:27:59.567 | INFO | __main__:<module>:241 - merge with original and export: 0:02:14.664176
2025-05-19 16:27:59.572 | INFO | __main__:<module>:242 - vp with dwell time 2025-05-14: 0:08:28.700960
2025-05-19 16:41:59.638 | INFO | __main__:<module>:81 - 2025-05-14: condense vp for trip 0:03:01.743441
2025-06-24 17:34:19.943 | INFO | __main__:get_vp_direction_column:127 - export vp direction: 0:00:00
2025-06-24 17:39:37.264 | INFO | __main__:merge_in_vp_direction:75 - 2025-06-11: export usable vp with direction: 0:01:45.602199
2025-06-24 17:39:37.557 | INFO | __main__:<module>:158 - 2025-06-11: add vp direction execution time: 0:01:45.895240
9 changes: 7 additions & 2 deletions gtfs_funnel/raw_vp_comparison.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
from segment_speed_utils import segment_calcs
from update_vars import GTFS_DATA_DICT, SEGMENT_GCS

import google.auth
credentials, _ = google.auth.default()

def merge_raw_and_grouped(
vp: gpd.GeoDataFrame,
vp_grouped: gpd.GeoDataFrame,
Expand Down Expand Up @@ -77,12 +80,14 @@ def merge_raw_and_grouped(

vp = delayed(gpd.read_parquet)(
f"{SEGMENT_GCS}{RAW_VP}_{analysis_date}.parquet",
columns = subset_cols
columns = subset_cols,
storage_options={"token": credentials.token},
).dropna(subset="trip_instance_key")

vp_grouped = delayed(gpd.read_parquet)(
f"{SEGMENT_GCS}{VP_GROUPED}_{analysis_date}.parquet",
columns = subset_cols + ["moving_timestamp_local", "n_vp"]
columns = subset_cols + ["moving_timestamp_local", "n_vp"],
storage_options={"token": credentials.token},
).dropna(subset="trip_instance_key")

results = delayed(merge_raw_and_grouped)(vp, vp_grouped)
Expand Down
4 changes: 4 additions & 0 deletions gtfs_funnel/route_typologies.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
from update_vars import SHARED_GCS, SCHED_GCS, COMPILED_CACHED_VIEWS, GTFS_DATA_DICT
import nacto_utils

import google.auth
credentials, _ = google.auth.default()

route_cols = ["schedule_gtfs_dataset_key", "route_id"]

typology_cols = ["freq_category", "typology"]
Expand Down Expand Up @@ -105,6 +108,7 @@ def prep_roads(year: str, buffer_meters: int, dict_inputs: dict) -> gpd.GeoDataF
roads = gpd.read_parquet(
f"{SHARED_GCS}{ROAD_SEGMENTS}.parquet",
columns = road_segment_cols + ["geometry"],
storage_options={"token": credentials.token}
).to_crs(PROJECT_CRS)

road_stats = road_stats.assign(
Expand Down
6 changes: 5 additions & 1 deletion gtfs_funnel/schedule_stats_by_route_direction.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
from shared_utils.rt_utils import METERS_PER_MILE
from update_vars import GTFS_DATA_DICT, RT_SCHED_GCS

import google.auth
credentials, _ = google.auth.default()

def cardinal_direction_by_trip(
stop_times: gpd.GeoDataFrame,
group_cols: list
Expand Down Expand Up @@ -55,7 +58,8 @@ def assemble_scheduled_trip_metrics(

# Load files
df = gpd.read_parquet(
f"{RT_SCHED_GCS}{STOP_TIMES_FILE}_{analysis_date}.parquet"
f"{RT_SCHED_GCS}{STOP_TIMES_FILE}_{analysis_date}.parquet",
storage_options={"token": credentials.token}
)

trip_cols = ["trip_instance_key"]
Expand Down
4 changes: 4 additions & 0 deletions gtfs_funnel/stop_arrivals_in_roads.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
road_cols = ["linearid", "mtfcc", "fullname"]
road_segment_cols = road_cols + ["segment_sequence"]

import google.auth
credentials, _ = google.auth.default()

def buffer_roads(road_file: str, buffer_meters: int) -> gpd.GeoDataFrame:
"""
Buffer 2 mile road segments
Expand All @@ -25,6 +28,7 @@ def buffer_roads(road_file: str, buffer_meters: int) -> gpd.GeoDataFrame:
f"{SHARED_GCS}"
f"{road_file}.parquet",
columns = road_segment_cols + ["geometry"],
storage_options={"token": credentials.token}
).to_crs(PROJECT_CRS)

df = df.assign(
Expand Down
2 changes: 1 addition & 1 deletion gtfs_funnel/update_vars.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
apr2024_week + oct2024_week
)

analysis_date_list = [rt_dates.DATES['may2025']]
analysis_date_list = [rt_dates.DATES['jun2025']]

GTFS_DATA_DICT = catalog_utils.get_catalog("gtfs_analytics_data")

Expand Down
162 changes: 162 additions & 0 deletions gtfs_funnel/vp_direction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
"""
Pre-processing vehicle positions.
Drop all RT trips with less than 10 min of data.
Add direction to vp.
Doing this with geopandas gdfs will crash kernel (2 geom cols too much).
Doing this with dask_geopandas gddfs takes ~25 min.
Doing this with dask ddfs (x, y) coords takes ~7 min.
Doing this with dask ddfs + np arrays takes ~4 min. (but persisting takes another 4 min)
Doing this with pandas and numpy arrays takes ~8 min.
"""
import datetime
import geopandas as gpd
import numpy as np
import pandas as pd
import sys

from loguru import logger

from calitp_data_analysis import utils
from calitp_data_analysis.geography_utils import WGS84
from segment_speed_utils.project_vars import PROJECT_CRS
from segment_speed_utils import vp_transform
from shared_utils import publish_utils, rt_utils
from update_vars import GTFS_DATA_DICT, SEGMENT_GCS

import google.auth
credentials, _ = google.auth.default()

def merge_in_vp_direction(
analysis_date: str,
dict_inputs: dict = {}
):
"""
Merge staged vp_usable with the vp direction results
and export.
"""
time0 = datetime.datetime.now()

INPUT_FILE = dict_inputs.speeds_tables.usable_vp

vp_direction = pd.read_parquet(
f"{SEGMENT_GCS}vp_direction_{analysis_date}.parquet"
)

# By the end of add_vp_direction, we return df, not gdf
# Let's convert to tabular now, make use of partitioning
vp = gpd.read_parquet(
f"{SEGMENT_GCS}{INPUT_FILE}_{analysis_date}_stage.parquet",
storage_options={"token": credentials.token}
).to_crs(WGS84).merge(
vp_direction,
on = "vp_idx",
how = "inner"
)

vp = vp.assign(
x = vp.geometry.x,
y = vp.geometry.y
).drop(columns = "geometry")

export_path = f"{SEGMENT_GCS}{INPUT_FILE}_{analysis_date}"

publish_utils.if_exists_then_delete(export_path)

vp.to_parquet(
export_path,
partition_cols = "gtfs_dataset_key",
# if we don't delete the entire folder of partitioned parquets, this
# can delete it if the partitions have the same name
#existing_data_behavior = "delete_matching"
)

time1 = datetime.datetime.now()
logger.info(f"{analysis_date}: export usable vp with direction: {time1 - time0}")

return


def get_vp_direction_column(
analysis_date: str,
dict_inputs: dict = {}
):
"""
"""
time0 = datetime.datetime.now()

USABLE_VP = dict_inputs.speeds_tables.usable_vp

vp_gdf = gpd.read_parquet(
f"{SEGMENT_GCS}{USABLE_VP}_{analysis_date}_stage.parquet",
columns = ["trip_instance_key", "vp_idx", "geometry"],
storage_options={"token": credentials.token}
).to_crs(PROJECT_CRS)

vp_condensed = vp_transform.condense_point_geom_to_line(
vp_gdf,
group_cols = ["trip_instance_key"],
#sort_cols = ["trip_instance_key", "vp_idx"], not used?
array_cols = ["vp_idx", "geometry"]
)

vp_direction_series = []

for row in vp_condensed.itertuples():
vp_geom = np.array(getattr(row, "geometry"))
next_vp_geom = vp_geom[1:]

vp_direction = np.array(
["Unknown"] +
[rt_utils.primary_cardinal_direction(prior_vp, current_vp)
for prior_vp, current_vp in zip(vp_geom, next_vp_geom)
])

vp_direction_series.append(vp_direction)

keep_cols = ["vp_idx", "vp_primary_direction"]

vp_condensed = vp_condensed.assign(
vp_primary_direction = vp_direction_series
)[keep_cols].explode(column=keep_cols)

vp_condensed.to_parquet(
f"{SEGMENT_GCS}vp_direction_{analysis_date}.parquet"
)

time1 = datetime.datetime.now()
logger.info(f"export vp direction: {time1 - time1}")

return


if __name__ == "__main__":

from update_vars import analysis_date_list

LOG_FILE = "./logs/vp_preprocessing.log"
logger.add(LOG_FILE, retention="3 months")
logger.add(sys.stderr,
format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}",
level="INFO")

for analysis_date in analysis_date_list:
start = datetime.datetime.now()

# Add in direction of travel
#get_vp_direction_column(
# analysis_date,
# GTFS_DATA_DICT
#)

merge_in_vp_direction(
analysis_date,
GTFS_DATA_DICT
)

end = datetime.datetime.now()
logger.info(
f"{analysis_date}: add vp direction execution time: "
f"{end - start}"
)

1 change: 0 additions & 1 deletion gtfs_funnel/vp_dwell_time.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from shared_utils import publish_utils
from update_vars import GTFS_DATA_DICT


def import_vp(analysis_date: str, **kwargs) -> pd.DataFrame:
"""
Import vehicle positions for this script,
Expand Down
Loading