- 
                Notifications
    You must be signed in to change notification settings 
- Fork 0
Microbatch implementation #558
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
intervals, allowing for outlier detection
detector statistics
1. Converts the missing_rows model to microbatch
2. Removes an unnecessary join to add station metadata to model.
    This data already exists on the active_stations model, and can be
    brought along in the spine computation.
3. Removes an unnecessary join to the g_factor speed model. I'm pretty
   sure we can compute the g factor speed downstream of this model
   without any joins.
4. Removes unnecesseary coalesce statements: the metadata we are
   coalescing comes from the same place. If there are any discrepancies,
   it would be due to a bad join, which we would want to fix instead of
   coalescing.
    There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This model is consolidated into int_vds__detector_agg_five_minutes_normalized
        
          
                ...ntermediate/clearinghouse/int_clearinghouse__detector_agg_five_minutes_with_missing_rows.sql
          
            Show resolved
            Hide resolved
        
              
          
                ...ntermediate/clearinghouse/int_clearinghouse__detector_agg_five_minutes_with_missing_rows.sql
          
            Show resolved
            Hide resolved
        
              
          
                ...ntermediate/clearinghouse/int_clearinghouse__detector_agg_five_minutes_with_missing_rows.sql
          
            Show resolved
            Hide resolved
        
      | -- calculate the statistics | ||
| weekly_stats as ( | ||
| select | ||
| detector_id, | ||
| avg(volume_sum) as volume_mean, | ||
| stddev(volume_sum) as volume_stddev, | ||
| -- consider using max_capacity | ||
| percentile_cont(0.95) within group (order by volume_sum) as volume_95th, | ||
| percentile_cont(0.95) within group (order by occupancy_avg) as occupancy_95th | ||
| from filtered_five_minute_agg_lastweek | ||
| group by detector_id | ||
| ), | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This (and above) is what I've tried to re-implement in an idempotent way in int_diagnostic__detector_outlier_thresholds
| ), | ||
|  | ||
| -- impute detected outliers | ||
| outlier_removed_data as ( | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is consolidated into int_vds__detector_agg_five_minutes_normalized
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice example of the simple case for microbatch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🙌 very nice @ian-r-rose ! I don't see any glaring issues in the models.
| inner join good_detectors | ||
| on | ||
| agg.detector_id = good_detectors.detector_id | ||
| and agg.sample_date = good_detectors.sample_date | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't necessarily need to be changed (unless performance is improved?), but doing this filter as WHERE EXISTS would show the query's intent more explicitly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion @JamesSLogan! I'm not familiar with this particular style of filter joins, can you give an example?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think something like below would perform the filter in a similar manner as the current join to good_detectors. The idea is that the database doesn't have to build a large intermediate result set from the join and can instead simply search good_detectors, stopping as soon as matches are found. This, of course, depends on how snowflake optimizes the query...
select
    ...
from agg
inner join agg_dates_to_evaluate
    on
        agg.sample_date >= agg_dates_to_evaluate.agg_date
        and agg.sample_date
        < dateadd(day, {{ var("outlier_agg_time_window") }}, agg_dates_to_evaluate.agg_date)
where
    agg.station_type in ('ML', 'HV')
    and exists (
        select 1
        from good_detectors gd
        where
            gd.detector_id = agg.detector_id
            and gd.sample_date = agg.sample_date
    )There could be a difference in outputted rows if agg -> good_detectors is one -> many, which I don't think is the case here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting...would this be considered a correlated subquery? I've never quite felt smart enough for those...
I'm going to test this to see if I can find a difference in the query plan and performance, thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, and I feel the same! They're typically harder to read but this case would better convey intent (slightly) and maybe even improve performance 🤞
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I took a look at this, and just documenting what I saw here:
There is almost no difference in performance between these two approaches: the query plans look almost identical, and both involve effectively the same join. This particular join is a small fraction of the total runtime of the query. There are two minor differences I noticed in the query plan:
- First, the join type in the query plan for the correlated subquery is "semi" instead of "inner". From what I understand searching around, a subquery used in this way is effectively synonymous with "semi-join"
- Second, there is a pre-aggregation in the correlated subquery to ensure there is a single detector_idandsample_datefor the existence check. If I understand your comment above, this is what you were referring to above for the one->many join.
Here is a snapshot of the inner join query plan:
 
And here is the correlated subquery:
 
Happy to take this suggestion to convey the intent of the join, thanks for the knowledge sharing @JamesSLogan!
        
          
                ...ntermediate/clearinghouse/int_clearinghouse__detector_agg_five_minutes_with_missing_rows.sql
          
            Show resolved
            Hide resolved
        
              
          
                transform/models/intermediate/vds/int_vds__detector_agg_five_minutes_normalized.sql
          
            Show resolved
            Hide resolved
        
      I still want to move the 60th percentile occupancy calculation into the thresholds model
that all data cleanup happens in the same place.
| 
 @ian-r-rose Hi Ian, for the outlier detection data processing, it's a great idea to consolidate it into a single data processing model! For the gspeed,  I have talked with Ken today and the only reason we brought onto the table is about the performance (Please check PR #502). 1. In the 5-minute dataset with missing rows, it would make the table much larger compared to the dataset without missing rows. I'm not sure if it will cause significant delay. 2. The dataset with missing rows generates a large amount of  | 
| 
 My suspicion is that the extra cost of computing gspeed for the missing rows is less than the cost of joining two large tables. In general, joins are more expensive and less parallelizable than operations that can happen on a single row, and filling missing rows seems to mostly increase the table sizes by ~10%. | 
| 
 I agree with your claim. Please feel free to move the gspeed model after data prep. | 
incremental_model_look_back variables. We don't need them anymore as they are replaced by microbatch configuration!
back a bit further, even in dev.
overridden in an apparent dbt bug: dbt-labs/dbt-core#6013 It's a bit more verbose, but at least it's explicit.
| config: | ||
| event_time: SAMPLE_DATE | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is pretty crucial! The raw view here is not an incremental model, but we still set an event_time config on it so that downstream microbatch models know how to filter on it. So, all microbatch models have an event_time config, and some non-microbatch ones need it as well.
back further than that in dev
macros like get_snowflake_refresh_warehouse(). Disable concurrent_batches because DELETE DML statements lock the table, so batch concurrency is pretty useless unless Snowflake starts being more partition-aware.
imputation layer. By organizing all of the models that look back in time together, we can handle them specially during backfills
Follow on work from #477
I've been looking at using the "microbatch" incremental strategy for our large incremental models, and think it would be a nice simplification to a lot of the patterns here (such as being able to delete
make_model_incremental) as well as enabling easier backfills of missing data. More thoughts on this are in #477. In this PR I've been taking an initial look at implementation, and wanted to try to get some thoughts from @thehanggit before he leaves Caltrans.For most of our incremental models converting to the microbatch strategy is pretty straightforward (see, e.g., here). But the outlier removal model required a bit more thought, because it always compares with the last seven days' data to identify outliers. I don't really like this approach for a few reasons:
So, the bulk of this PR is actually doing some refactoring of some of the early data processing for the clearinghouse data. Specifically, I do the following:
asofjoin to get the most recent statistics for a detector.timestamp_spinemacro that @JamesSLogan created