diff --git a/_shared_utils/shared_utils/rt_dates.py b/_shared_utils/shared_utils/rt_dates.py index 988145946b..5f56314f58 100644 --- a/_shared_utils/shared_utils/rt_dates.py +++ b/_shared_utils/shared_utils/rt_dates.py @@ -62,7 +62,7 @@ "feb2025": "2025-02-12", "mar2025": "2025-03-12", "apr2025": "2025-04-16", - "may2025": "2025-05-14", + "jun2025": "2025-06-11", } years_available = list(range(2023, datetime.datetime.now().year + 1)) diff --git a/gtfs_funnel/Makefile b/gtfs_funnel/Makefile index 3756100ea0..ee532fbe49 100644 --- a/gtfs_funnel/Makefile +++ b/gtfs_funnel/Makefile @@ -13,6 +13,7 @@ preprocess_schedule_vp_dependency: preprocess_vp: python vp_keep_usable.py + python vp_direction.py python cleanup.py python vp_dwell_time.py python vp_condenser.py diff --git a/gtfs_funnel/download_stop_times.py b/gtfs_funnel/download_stop_times.py index 375e085ad6..4f1413749e 100644 --- a/gtfs_funnel/download_stop_times.py +++ b/gtfs_funnel/download_stop_times.py @@ -14,6 +14,9 @@ from shared_utils import gtfs_utils_v2 from update_vars import GTFS_DATA_DICT, COMPILED_CACHED_VIEWS +import google.auth +credentials, _ = google.auth.default() + def download_one_day(analysis_date: str): """ Download single day for stop_times. @@ -22,7 +25,8 @@ def download_one_day(analysis_date: str): start = dt.datetime.now() full_trips = pd.read_parquet( - f"{COMPILED_CACHED_VIEWS}trips_{analysis_date}.parquet") + f"{COMPILED_CACHED_VIEWS}trips_{analysis_date}.parquet", + ) FEEDS_TO_RUN = full_trips.feed_key.unique().tolist() diff --git a/gtfs_funnel/logs/vp_preprocessing.log b/gtfs_funnel/logs/vp_preprocessing.log index 1e34576179..b1ea1c0679 100644 --- a/gtfs_funnel/logs/vp_preprocessing.log +++ b/gtfs_funnel/logs/vp_preprocessing.log @@ -102,3 +102,6 @@ 2025-05-19 16:27:59.567 | INFO | __main__::241 - merge with original and export: 0:02:14.664176 2025-05-19 16:27:59.572 | INFO | __main__::242 - vp with dwell time 2025-05-14: 0:08:28.700960 2025-05-19 16:41:59.638 | INFO | __main__::81 - 2025-05-14: condense vp for trip 0:03:01.743441 +2025-06-24 17:34:19.943 | INFO | __main__:get_vp_direction_column:127 - export vp direction: 0:00:00 +2025-06-24 17:39:37.264 | INFO | __main__:merge_in_vp_direction:75 - 2025-06-11: export usable vp with direction: 0:01:45.602199 +2025-06-24 17:39:37.557 | INFO | __main__::158 - 2025-06-11: add vp direction execution time: 0:01:45.895240 diff --git a/gtfs_funnel/raw_vp_comparison.py b/gtfs_funnel/raw_vp_comparison.py index 551b0c050d..6b126c5a98 100644 --- a/gtfs_funnel/raw_vp_comparison.py +++ b/gtfs_funnel/raw_vp_comparison.py @@ -7,6 +7,9 @@ from segment_speed_utils import segment_calcs from update_vars import GTFS_DATA_DICT, SEGMENT_GCS +import google.auth +credentials, _ = google.auth.default() + def merge_raw_and_grouped( vp: gpd.GeoDataFrame, vp_grouped: gpd.GeoDataFrame, @@ -77,12 +80,14 @@ def merge_raw_and_grouped( vp = delayed(gpd.read_parquet)( f"{SEGMENT_GCS}{RAW_VP}_{analysis_date}.parquet", - columns = subset_cols + columns = subset_cols, + storage_options={"token": credentials.token}, ).dropna(subset="trip_instance_key") vp_grouped = delayed(gpd.read_parquet)( f"{SEGMENT_GCS}{VP_GROUPED}_{analysis_date}.parquet", - columns = subset_cols + ["moving_timestamp_local", "n_vp"] + columns = subset_cols + ["moving_timestamp_local", "n_vp"], + storage_options={"token": credentials.token}, ).dropna(subset="trip_instance_key") results = delayed(merge_raw_and_grouped)(vp, vp_grouped) diff --git a/gtfs_funnel/route_typologies.py b/gtfs_funnel/route_typologies.py index c402c86519..3e4fc480c6 100644 --- a/gtfs_funnel/route_typologies.py +++ b/gtfs_funnel/route_typologies.py @@ -31,6 +31,9 @@ from update_vars import SHARED_GCS, SCHED_GCS, COMPILED_CACHED_VIEWS, GTFS_DATA_DICT import nacto_utils +import google.auth +credentials, _ = google.auth.default() + route_cols = ["schedule_gtfs_dataset_key", "route_id"] typology_cols = ["freq_category", "typology"] @@ -105,6 +108,7 @@ def prep_roads(year: str, buffer_meters: int, dict_inputs: dict) -> gpd.GeoDataF roads = gpd.read_parquet( f"{SHARED_GCS}{ROAD_SEGMENTS}.parquet", columns = road_segment_cols + ["geometry"], + storage_options={"token": credentials.token} ).to_crs(PROJECT_CRS) road_stats = road_stats.assign( diff --git a/gtfs_funnel/schedule_stats_by_route_direction.py b/gtfs_funnel/schedule_stats_by_route_direction.py index abeeeae6a1..9266b9dd83 100644 --- a/gtfs_funnel/schedule_stats_by_route_direction.py +++ b/gtfs_funnel/schedule_stats_by_route_direction.py @@ -12,6 +12,9 @@ from shared_utils.rt_utils import METERS_PER_MILE from update_vars import GTFS_DATA_DICT, RT_SCHED_GCS +import google.auth +credentials, _ = google.auth.default() + def cardinal_direction_by_trip( stop_times: gpd.GeoDataFrame, group_cols: list @@ -55,7 +58,8 @@ def assemble_scheduled_trip_metrics( # Load files df = gpd.read_parquet( - f"{RT_SCHED_GCS}{STOP_TIMES_FILE}_{analysis_date}.parquet" + f"{RT_SCHED_GCS}{STOP_TIMES_FILE}_{analysis_date}.parquet", + storage_options={"token": credentials.token} ) trip_cols = ["trip_instance_key"] diff --git a/gtfs_funnel/stop_arrivals_in_roads.py b/gtfs_funnel/stop_arrivals_in_roads.py index 6fd1ab1c20..86f2f748c2 100644 --- a/gtfs_funnel/stop_arrivals_in_roads.py +++ b/gtfs_funnel/stop_arrivals_in_roads.py @@ -17,6 +17,9 @@ road_cols = ["linearid", "mtfcc", "fullname"] road_segment_cols = road_cols + ["segment_sequence"] +import google.auth +credentials, _ = google.auth.default() + def buffer_roads(road_file: str, buffer_meters: int) -> gpd.GeoDataFrame: """ Buffer 2 mile road segments @@ -25,6 +28,7 @@ def buffer_roads(road_file: str, buffer_meters: int) -> gpd.GeoDataFrame: f"{SHARED_GCS}" f"{road_file}.parquet", columns = road_segment_cols + ["geometry"], + storage_options={"token": credentials.token} ).to_crs(PROJECT_CRS) df = df.assign( diff --git a/gtfs_funnel/update_vars.py b/gtfs_funnel/update_vars.py index be3604bf60..589efda6f2 100644 --- a/gtfs_funnel/update_vars.py +++ b/gtfs_funnel/update_vars.py @@ -11,7 +11,7 @@ apr2024_week + oct2024_week ) -analysis_date_list = [rt_dates.DATES['may2025']] +analysis_date_list = [rt_dates.DATES['jun2025']] GTFS_DATA_DICT = catalog_utils.get_catalog("gtfs_analytics_data") diff --git a/gtfs_funnel/vp_direction.py b/gtfs_funnel/vp_direction.py new file mode 100644 index 0000000000..41a3179684 --- /dev/null +++ b/gtfs_funnel/vp_direction.py @@ -0,0 +1,162 @@ +""" +Pre-processing vehicle positions. +Drop all RT trips with less than 10 min of data. +Add direction to vp. + +Doing this with geopandas gdfs will crash kernel (2 geom cols too much). +Doing this with dask_geopandas gddfs takes ~25 min. +Doing this with dask ddfs (x, y) coords takes ~7 min. +Doing this with dask ddfs + np arrays takes ~4 min. (but persisting takes another 4 min) +Doing this with pandas and numpy arrays takes ~8 min. +""" +import datetime +import geopandas as gpd +import numpy as np +import pandas as pd +import sys + +from loguru import logger + +from calitp_data_analysis import utils +from calitp_data_analysis.geography_utils import WGS84 +from segment_speed_utils.project_vars import PROJECT_CRS +from segment_speed_utils import vp_transform +from shared_utils import publish_utils, rt_utils +from update_vars import GTFS_DATA_DICT, SEGMENT_GCS + +import google.auth +credentials, _ = google.auth.default() + +def merge_in_vp_direction( + analysis_date: str, + dict_inputs: dict = {} +): + """ + Merge staged vp_usable with the vp direction results + and export. + """ + time0 = datetime.datetime.now() + + INPUT_FILE = dict_inputs.speeds_tables.usable_vp + + vp_direction = pd.read_parquet( + f"{SEGMENT_GCS}vp_direction_{analysis_date}.parquet" + ) + + # By the end of add_vp_direction, we return df, not gdf + # Let's convert to tabular now, make use of partitioning + vp = gpd.read_parquet( + f"{SEGMENT_GCS}{INPUT_FILE}_{analysis_date}_stage.parquet", + storage_options={"token": credentials.token} + ).to_crs(WGS84).merge( + vp_direction, + on = "vp_idx", + how = "inner" + ) + + vp = vp.assign( + x = vp.geometry.x, + y = vp.geometry.y + ).drop(columns = "geometry") + + export_path = f"{SEGMENT_GCS}{INPUT_FILE}_{analysis_date}" + + publish_utils.if_exists_then_delete(export_path) + + vp.to_parquet( + export_path, + partition_cols = "gtfs_dataset_key", + # if we don't delete the entire folder of partitioned parquets, this + # can delete it if the partitions have the same name + #existing_data_behavior = "delete_matching" + ) + + time1 = datetime.datetime.now() + logger.info(f"{analysis_date}: export usable vp with direction: {time1 - time0}") + + return + + +def get_vp_direction_column( + analysis_date: str, + dict_inputs: dict = {} +): + """ + """ + time0 = datetime.datetime.now() + + USABLE_VP = dict_inputs.speeds_tables.usable_vp + + vp_gdf = gpd.read_parquet( + f"{SEGMENT_GCS}{USABLE_VP}_{analysis_date}_stage.parquet", + columns = ["trip_instance_key", "vp_idx", "geometry"], + storage_options={"token": credentials.token} + ).to_crs(PROJECT_CRS) + + vp_condensed = vp_transform.condense_point_geom_to_line( + vp_gdf, + group_cols = ["trip_instance_key"], + #sort_cols = ["trip_instance_key", "vp_idx"], not used? + array_cols = ["vp_idx", "geometry"] + ) + + vp_direction_series = [] + + for row in vp_condensed.itertuples(): + vp_geom = np.array(getattr(row, "geometry")) + next_vp_geom = vp_geom[1:] + + vp_direction = np.array( + ["Unknown"] + + [rt_utils.primary_cardinal_direction(prior_vp, current_vp) + for prior_vp, current_vp in zip(vp_geom, next_vp_geom) + ]) + + vp_direction_series.append(vp_direction) + + keep_cols = ["vp_idx", "vp_primary_direction"] + + vp_condensed = vp_condensed.assign( + vp_primary_direction = vp_direction_series + )[keep_cols].explode(column=keep_cols) + + vp_condensed.to_parquet( + f"{SEGMENT_GCS}vp_direction_{analysis_date}.parquet" + ) + + time1 = datetime.datetime.now() + logger.info(f"export vp direction: {time1 - time1}") + + return + + +if __name__ == "__main__": + + from update_vars import analysis_date_list + + LOG_FILE = "./logs/vp_preprocessing.log" + logger.add(LOG_FILE, retention="3 months") + logger.add(sys.stderr, + format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}", + level="INFO") + + for analysis_date in analysis_date_list: + start = datetime.datetime.now() + + # Add in direction of travel + #get_vp_direction_column( + # analysis_date, + # GTFS_DATA_DICT + #) + + merge_in_vp_direction( + analysis_date, + GTFS_DATA_DICT + ) + + end = datetime.datetime.now() + logger.info( + f"{analysis_date}: add vp direction execution time: " + f"{end - start}" + ) + \ No newline at end of file diff --git a/gtfs_funnel/vp_dwell_time.py b/gtfs_funnel/vp_dwell_time.py index fe8502ab42..a95c5d4b2e 100644 --- a/gtfs_funnel/vp_dwell_time.py +++ b/gtfs_funnel/vp_dwell_time.py @@ -12,7 +12,6 @@ from shared_utils import publish_utils from update_vars import GTFS_DATA_DICT - def import_vp(analysis_date: str, **kwargs) -> pd.DataFrame: """ Import vehicle positions for this script, diff --git a/gtfs_funnel/vp_keep_usable.py b/gtfs_funnel/vp_keep_usable.py index e0036a9c14..bc313d21bd 100644 --- a/gtfs_funnel/vp_keep_usable.py +++ b/gtfs_funnel/vp_keep_usable.py @@ -19,11 +19,11 @@ from calitp_data_analysis import utils from calitp_data_analysis.geography_utils import WGS84 -from segment_speed_utils.project_vars import PROJECT_CRS -from segment_speed_utils import vp_transform -from shared_utils import geo_utils, publish_utils, rt_utils from update_vars import GTFS_DATA_DICT, SEGMENT_GCS +import google.auth +credentials, _ = google.auth.default() + def find_valid_trips( vp: pd.DataFrame, timestamp_col: str, @@ -62,6 +62,7 @@ def pare_down_to_valid_trips( vp = gpd.read_parquet( f"{SEGMENT_GCS}{INPUT_FILE}_{analysis_date}.parquet", + storage_options={"token": credentials.token} ).to_crs(WGS84) usable_trips = find_valid_trips(vp, TIMESTAMP_COL, TIME_CUTOFF) @@ -87,106 +88,8 @@ def pare_down_to_valid_trips( time1 = datetime.datetime.now() logger.info(f"pare down vp: {time1 - time0}") - # Add in direction of travel - get_vp_direction_column(vp) - - time2 = datetime.datetime.now() - logger.info(f"export vp direction: {time2 - time1}") - return - - -def merge_in_vp_direction( - analysis_date: str, - dict_inputs: dict = {} -): - """ - Merge staged vp_usable with the vp direction results - and export. - """ - time0 = datetime.datetime.now() - - INPUT_FILE = dict_inputs.speeds_tables.usable_vp - - vp_direction = pd.read_parquet( - f"{SEGMENT_GCS}vp_direction_{analysis_date}.parquet" - ) - - # By the end of add_vp_direction, we return df, not gdf - # Let's convert to tabular now, make use of partitioning - vp = gpd.read_parquet( - f"{SEGMENT_GCS}{INPUT_FILE}_{analysis_date}_stage.parquet", - ).to_crs(WGS84).merge( - vp_direction, - on = "vp_idx", - how = "inner" - ) - - vp = vp.assign( - x = vp.geometry.x, - y = vp.geometry.y - ).drop(columns = "geometry") - - export_path = f"{SEGMENT_GCS}{INPUT_FILE}_{analysis_date}" - - publish_utils.if_exists_then_delete(export_path) - - vp.to_parquet( - export_path, - partition_cols = "gtfs_dataset_key", - # if we don't delete the entire folder of partitioned parquets, this - # can delete it if the partitions have the same name - #existing_data_behavior = "delete_matching" - ) - - time1 = datetime.datetime.now() - logger.info(f"{analysis_date}: export usable vp with direction: {time1 - time0}") - - return - - -def get_vp_direction_column( - vp_gdf: gpd.GeoDataFrame, -) -> pd.DataFrame: - """ - """ - vp_gdf = vp_gdf[ - ["trip_instance_key", "vp_idx", "geometry"] - ].to_crs(PROJECT_CRS) - - vp_condensed = vp_transform.condense_point_geom_to_line( - vp_gdf, - group_cols = ["trip_instance_key"], -# sort_cols = ["trip_instance_key", "vp_idx"], not used? - array_cols = ["vp_idx", "geometry"] - ) - - vp_direction_series = [] - - for row in vp_condensed.itertuples(): - vp_geom = np.array(getattr(row, "geometry")) - next_vp_geom = vp_geom[1:] - - vp_direction = np.array( - ["Unknown"] + - [rt_utils.primary_cardinal_direction(prior_vp, current_vp) - for prior_vp, current_vp in zip(vp_geom, next_vp_geom) - ]) - - vp_direction_series.append(vp_direction) - - keep_cols = ["vp_idx", "vp_primary_direction"] - - vp_condensed = vp_condensed.assign( - vp_primary_direction = vp_direction_series - )[keep_cols].explode(column=keep_cols) - - vp_condensed.to_parquet( - f"{SEGMENT_GCS}vp_direction_{analysis_date}.parquet" - ) - - return - + if __name__ == "__main__": @@ -206,14 +109,9 @@ def get_vp_direction_column( GTFS_DATA_DICT ) - merge_in_vp_direction( - analysis_date, - GTFS_DATA_DICT - ) - end = datetime.datetime.now() logger.info( - f"{analysis_date}: pare down vp, add direction execution time: " + f"{analysis_date}: pare down vp execution time: " f"{end - start}" ) \ No newline at end of file diff --git a/high_quality_transit_areas/update_vars.py b/high_quality_transit_areas/update_vars.py index bab99a4c08..7c95dd34eb 100644 --- a/high_quality_transit_areas/update_vars.py +++ b/high_quality_transit_areas/update_vars.py @@ -1,7 +1,7 @@ from shared_utils import rt_dates import datetime as dt -analysis_date = rt_dates.DATES["may2025"] +analysis_date = rt_dates.DATES["jun2025"] GCS_FILE_PATH = ("gs://calitp-analytics-data/data-analyses/" "high_quality_transit_areas/") diff --git a/open_data/update_vars.py b/open_data/update_vars.py index 358038f6a9..994c97652f 100644 --- a/open_data/update_vars.py +++ b/open_data/update_vars.py @@ -1,7 +1,7 @@ from pathlib import Path from shared_utils import catalog_utils, rt_dates -analysis_date = rt_dates.DATES["may2025"] +analysis_date = rt_dates.DATES["jun2025"] GTFS_DATA_DICT = catalog_utils.get_catalog("gtfs_analytics_data") diff --git a/rt_scheduled_v_ran/scripts/update_vars.py b/rt_scheduled_v_ran/scripts/update_vars.py index a8fe7807b2..13d3c09ad5 100644 --- a/rt_scheduled_v_ran/scripts/update_vars.py +++ b/rt_scheduled_v_ran/scripts/update_vars.py @@ -7,7 +7,7 @@ oct2024_week = rt_dates.get_week("oct2024", exclude_wed=True) -analysis_date_list = [rt_dates.DATES["may2025"]] +analysis_date_list = [rt_dates.DATES["jun2025"]] GTFS_DATA_DICT = catalog_utils.get_catalog("gtfs_analytics_data") diff --git a/rt_segment_speeds/segment_speed_utils/project_vars.py b/rt_segment_speeds/segment_speed_utils/project_vars.py index 505e72db9f..da5a07a524 100644 --- a/rt_segment_speeds/segment_speed_utils/project_vars.py +++ b/rt_segment_speeds/segment_speed_utils/project_vars.py @@ -11,7 +11,7 @@ SHARED_GCS = GTFS_DATA_DICT.gcs_paths.SHARED_GCS PUBLIC_GCS = GTFS_DATA_DICT.gcs_paths.PUBLIC_GCS -analysis_date = rt_dates.DATES["may2025"] +analysis_date = rt_dates.DATES["jun2025"] analysis_date_list = [analysis_date] oct2023_week = rt_dates.get_week("oct2023", exclude_wed=True)