Skip to content

Conversation

@ian-r-rose
Copy link
Member

Follow on work from #477

I've been looking at using the "microbatch" incremental strategy for our large incremental models, and think it would be a nice simplification to a lot of the patterns here (such as being able to delete make_model_incremental) as well as enabling easier backfills of missing data. More thoughts on this are in #477. In this PR I've been taking an initial look at implementation, and wanted to try to get some thoughts from @thehanggit before he leaves Caltrans.

For most of our incremental models converting to the microbatch strategy is pretty straightforward (see, e.g., here). But the outlier removal model required a bit more thought, because it always compares with the last seven days' data to identify outliers. I don't really like this approach for a few reasons:

  • It's not idempotent, in that if I run the model on different days, I'll get different results.
  • It won't work if a station has been decommissioned, or otherwise hasn't produced any data for the last week.
  • It doesn't really fit well into the microbatch execution model, where we want to be able to run different days' data independently of each other (this is more of a conceptual cleanliness issue than a technical one).

So, the bulk of this PR is actually doing some refactoring of some of the early data processing for the clearinghouse data. Specifically, I do the following:

  1. Compute detector statistics at regular (quarterly) intervals for use in outlier removal. This is similar to what we do for the regression coefficients. When identifying outliers, we use an asof join to get the most recent statistics for a detector.
  2. Consolidate several of the data cleaning/preparation steps into a single model, including
    • Removing outliers
    • Filling out missing timestamps using the timestamp_spine macro that @JamesSLogan created
    • Attaching station metadata (still WIP)
  3. Move the computation of the "g factor" speed to after this data preparation step (still WIP). Previously this was done after outlier removal, but before the timestamp spine. Conceptually, it made more sense to me to compute this derived data column after the data prep, but perhaps there is some reason to do it before? @thehanggit I would appreciate any thoughts you have here.

1. Converts the missing_rows model to microbatch
2. Removes an unnecessary join to add station metadata to model.
    This data already exists on the active_stations model, and can be
    brought along in the spine computation.
3. Removes an unnecessary join to the g_factor speed model. I'm pretty
   sure we can compute the g factor speed downstream of this model
   without any joins.
4. Removes unnecesseary coalesce statements: the metadata we are
   coalescing comes from the same place. If there are any discrepancies,
   it would be due to a bad join, which we would want to fix instead of
   coalescing.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This model is consolidated into int_vds__detector_agg_five_minutes_normalized

Comment on lines -53 to -64
-- calculate the statistics
weekly_stats as (
select
detector_id,
avg(volume_sum) as volume_mean,
stddev(volume_sum) as volume_stddev,
-- consider using max_capacity
percentile_cont(0.95) within group (order by volume_sum) as volume_95th,
percentile_cont(0.95) within group (order by occupancy_avg) as occupancy_95th
from filtered_five_minute_agg_lastweek
group by detector_id
),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This (and above) is what I've tried to re-implement in an idempotent way in int_diagnostic__detector_outlier_thresholds

),

-- impute detected outliers
outlier_removed_data as (
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is consolidated into int_vds__detector_agg_five_minutes_normalized

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice example of the simple case for microbatch

Copy link
Contributor

@JamesSLogan JamesSLogan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🙌 very nice @ian-r-rose ! I don't see any glaring issues in the models.

Comment on lines 80 to 83
inner join good_detectors
on
agg.detector_id = good_detectors.detector_id
and agg.sample_date = good_detectors.sample_date
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't necessarily need to be changed (unless performance is improved?), but doing this filter as WHERE EXISTS would show the query's intent more explicitly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion @JamesSLogan! I'm not familiar with this particular style of filter joins, can you give an example?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think something like below would perform the filter in a similar manner as the current join to good_detectors. The idea is that the database doesn't have to build a large intermediate result set from the join and can instead simply search good_detectors, stopping as soon as matches are found. This, of course, depends on how snowflake optimizes the query...

select
    ...
from agg
inner join agg_dates_to_evaluate
    on
        agg.sample_date >= agg_dates_to_evaluate.agg_date
        and agg.sample_date
        < dateadd(day, {{ var("outlier_agg_time_window") }}, agg_dates_to_evaluate.agg_date)
where
    agg.station_type in ('ML', 'HV')
    and exists (
        select 1
        from good_detectors gd
        where
            gd.detector_id = agg.detector_id
            and gd.sample_date = agg.sample_date
    )

There could be a difference in outputted rows if agg -> good_detectors is one -> many, which I don't think is the case here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting...would this be considered a correlated subquery? I've never quite felt smart enough for those...

I'm going to test this to see if I can find a difference in the query plan and performance, thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and I feel the same! They're typically harder to read but this case would better convey intent (slightly) and maybe even improve performance 🤞

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took a look at this, and just documenting what I saw here:

There is almost no difference in performance between these two approaches: the query plans look almost identical, and both involve effectively the same join. This particular join is a small fraction of the total runtime of the query. There are two minor differences I noticed in the query plan:

  • First, the join type in the query plan for the correlated subquery is "semi" instead of "inner". From what I understand searching around, a subquery used in this way is effectively synonymous with "semi-join"
  • Second, there is a pre-aggregation in the correlated subquery to ensure there is a single detector_id and sample_date for the existence check. If I understand your comment above, this is what you were referring to above for the one->many join.

Here is a snapshot of the inner join query plan:

image

And here is the correlated subquery:

image

Happy to take this suggestion to convey the intent of the join, thanks for the knowledge sharing @JamesSLogan!

@thehanggit
Copy link
Contributor

Follow on work from #477

I've been looking at using the "microbatch" incremental strategy for our large incremental models, and think it would be a nice simplification to a lot of the patterns here (such as being able to delete make_model_incremental) as well as enabling easier backfills of missing data. More thoughts on this are in #477. In this PR I've been taking an initial look at implementation, and wanted to try to get some thoughts from @thehanggit before he leaves Caltrans.

For most of our incremental models converting to the microbatch strategy is pretty straightforward (see, e.g., here). But the outlier removal model required a bit more thought, because it always compares with the last seven days' data to identify outliers. I don't really like this approach for a few reasons:

  • It's not idempotent, in that if I run the model on different days, I'll get different results.
  • It won't work if a station has been decommissioned, or otherwise hasn't produced any data for the last week.
  • It doesn't really fit well into the microbatch execution model, where we want to be able to run different days' data independently of each other (this is more of a conceptual cleanliness issue than a technical one).

So, the bulk of this PR is actually doing some refactoring of some of the early data processing for the clearinghouse data. Specifically, I do the following:

  1. Compute detector statistics at regular (quarterly) intervals for use in outlier removal. This is similar to what we do for the regression coefficients. When identifying outliers, we use an asof join to get the most recent statistics for a detector.

  2. Consolidate several of the data cleaning/preparation steps into a single model, including

    • Removing outliers
    • Filling out missing timestamps using the timestamp_spine macro that @JamesSLogan created
    • Attaching station metadata (still WIP)
  3. Move the computation of the "g factor" speed to after this data preparation step (still WIP). Previously this was done after outlier removal, but before the timestamp spine. Conceptually, it made more sense to me to compute this derived data column after the data prep, but perhaps there is some reason to do it before? @thehanggit I would appreciate any thoughts you have here.

@ian-r-rose Hi Ian, for the outlier detection data processing, it's a great idea to consolidate it into a single data processing model!

For the gspeed, I have talked with Ken today and the only reason we brought onto the table is about the performance (Please check PR #502). 1. In the 5-minute dataset with missing rows, it would make the table much larger compared to the dataset without missing rows. I'm not sure if it will cause significant delay. 2. The dataset with missing rows generates a large amount of NULL value in flow and occ. I checked the gfactor speed calculation and it should have avoided to include them during calculation. But it still remains unclear how would they affect the gfactor calculation. I would think of a QC to double check the difference. Other than that, there is no other reasons.
One thing needs to be mentioned is, the outlier detection only generated a new column labeled as "observed outlier" or "observed data" for detectors and it doesn't filter out data with outlier classification. But it's pretty easy to remove them in downstream models by WHERE.

@ian-r-rose
Copy link
Member Author

For the gspeed, I have talked with Ken today and the only reason we brought onto the table is about the performance (Please check PR #502). 1. In the 5-minute dataset with missing rows, it would make the table much larger compared to the dataset without missing rows. I'm not sure if it will cause significant delay. 2. The dataset with missing rows generates a large amount of NULL value in flow and occ. I checked the gfactor speed calculation and it should have avoided to include them during calculation. But it still remains unclear how would they affect the gfactor calculation. I would think of a QC to double check the difference. Other than that, there is no other reasons.

My suspicion is that the extra cost of computing gspeed for the missing rows is less than the cost of joining two large tables. In general, joins are more expensive and less parallelizable than operations that can happen on a single row, and filling missing rows seems to mostly increase the table sizes by ~10%.

@thehanggit
Copy link
Contributor

For the gspeed, I have talked with Ken today and the only reason we brought onto the table is about the performance (Please check PR #502). 1. In the 5-minute dataset with missing rows, it would make the table much larger compared to the dataset without missing rows. I'm not sure if it will cause significant delay. 2. The dataset with missing rows generates a large amount of NULL value in flow and occ. I checked the gfactor speed calculation and it should have avoided to include them during calculation. But it still remains unclear how would they affect the gfactor calculation. I would think of a QC to double check the difference. Other than that, there is no other reasons.

My suspicion is that the extra cost of computing gspeed for the missing rows is less than the cost of joining two large tables. In general, joins are more expensive and less parallelizable than operations that can happen on a single row, and filling missing rows seems to mostly increase the table sizes by ~10%.

I agree with your claim. Please feel free to move the gspeed model after data prep.

incremental_model_look_back variables. We don't need them anymore as
they are replaced by microbatch configuration!
Comment on lines +48 to +49
config:
event_time: SAMPLE_DATE
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pretty crucial! The raw view here is not an incremental model, but we still set an event_time config on it so that downstream microbatch models know how to filter on it. So, all microbatch models have an event_time config, and some non-microbatch ones need it as well.

ian-r-rose added 11 commits June 6, 2025 14:26
macros like get_snowflake_refresh_warehouse(). Disable
concurrent_batches because DELETE DML statements lock the table, so
batch concurrency is pretty useless unless Snowflake starts being more
partition-aware.
imputation layer. By organizing all of the models that look back in time
together, we can handle them specially during backfills
@ian-r-rose ian-r-rose marked this pull request as ready for review August 5, 2025 20:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants