Skip to content

Commit e53841b

Browse files
committed
Fix Sat macro and Pipfile
Sat macro needed COALESCE(LAG()..) only for BigQuery
1 parent f30dac3 commit e53841b

File tree

1 file changed

+54
-15
lines changed

1 file changed

+54
-15
lines changed

macros/tables/snowflake/sat.sql

Lines changed: 54 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -84,29 +84,68 @@ valid_stg AS (
8484

8585
{%- endif %}
8686

87+
{%- set is_incremental = automate_dv.is_any_incremental() %}
88+
{%- set use_valid_stg = is_incremental and apply_source_filter %}
89+
{%- set source_table = 'valid_stg AS sd' if use_valid_stg else 'source_data AS sd' %}
90+
{%- set hashdiff_alias = automate_dv.prefix([src_hashdiff], 'sd', alias_target='source') %}
91+
{%- set lag_default = automate_dv.cast_binary('FFFFFFFF', quote=true) %}
92+
{%- set partition_by = automate_dv.prefix([src_pk], 'sd', alias_target='source') %}
93+
{%- set order_by = automate_dv.prefix([src_ldts], 'sd', alias_target='source') %}
94+
{%- set order_by_eff = automate_dv.prefix([src_eff], 'sd', alias_target='source') %}
95+
{%- set is_bigquery = target.type == 'bigquery' %}
96+
{%- set use_eff = automate_dv.is_something([src_eff]) %}
97+
{#- BigQuery does not support a 3-arg LAG() where the third arg is an expression, it must be a constant. Workaround below #}
98+
8799
unique_source_records AS (
88100
SELECT
89101
{{ automate_dv.prefix(source_cols, 'sd', alias_target='source') }}
90-
{%- if automate_dv.is_any_incremental() and apply_source_filter %}
91-
FROM valid_stg AS sd
92-
{%- else %}
93-
FROM source_data AS sd
94-
{%- endif %}
95-
{%- if automate_dv.is_any_incremental() %}
102+
FROM {{ source_table }}
103+
{%- if is_incremental %}
96104
LEFT OUTER JOIN latest_records AS lr
97105
ON {{ automate_dv.multikey(src_pk, prefix=['sd','lr'], condition='=') }}
98-
QUALIFY {{ automate_dv.prefix([src_hashdiff], 'sd', alias_target='source') }} != LAG({{ automate_dv.prefix([src_hashdiff], 'sd', alias_target='source') }}, 1, COALESCE( {{ automate_dv.prefix([src_hashdiff], 'lr', alias_target='source') }}, {{ automate_dv.cast_binary('FFFFFFFF', quote=true) }})) OVER (
99-
{%- else %}
100-
QUALIFY {{ automate_dv.prefix([src_hashdiff], 'sd', alias_target='source') }} != LAG({{ automate_dv.prefix([src_hashdiff], 'sd', alias_target='source') }}, 1, {{ automate_dv.cast_binary('FFFFFFFF', quote=true) }}) OVER (
101106
{%- endif %}
102-
PARTITION BY {{ automate_dv.prefix([src_pk], 'sd', alias_target='source') }}
103-
{%- if automate_dv.is_something([src_eff]) %}
104-
ORDER BY {{ automate_dv.prefix([src_ldts], 'sd', alias_target='source') }} ASC,
105-
{{ automate_dv.prefix([src_eff], 'sd', alias_target='source') }} ASC
107+
108+
QUALIFY {{ hashdiff_alias }} !=
109+
{%- if is_incremental and is_bigquery %}
110+
COALESCE(
111+
LAG({{ hashdiff_alias }}, 1) OVER (
112+
PARTITION BY {{ partition_by }}
113+
{%- if use_eff %}
114+
ORDER BY {{ order_by }} ASC,
115+
{{ order_by_eff }} ASC
116+
{%- else %}
117+
ORDER BY {{ order_by }} ASC
118+
{%- endif %}
119+
),
120+
{{ automate_dv.prefix([src_hashdiff], 'lr', alias_target='target') }},
121+
{{ lag_default }}
122+
)
123+
{%- elif is_incremental and not is_bigquery %}
124+
LAG({{ hashdiff_alias }}, 1,
125+
COALESCE(
126+
{{ automate_dv.prefix([src_hashdiff], 'lr', alias_target='target') }},
127+
{{ lag_default }}
128+
)
129+
) OVER (
130+
PARTITION BY {{ partition_by }}
131+
{%- if use_eff %}
132+
ORDER BY {{ order_by }} ASC,
133+
{{ order_by_eff }} ASC
134+
{%- else %}
135+
ORDER BY {{ order_by }} ASC
136+
{%- endif %}
137+
)
106138
{%- else %}
107-
ORDER BY {{ automate_dv.prefix([src_ldts], 'sd', alias_target='source') }} ASC
139+
LAG({{ hashdiff_alias }}, 1, {{ lag_default }}) OVER (
140+
PARTITION BY {{ partition_by }}
141+
{%- if use_eff %}
142+
ORDER BY {{ order_by }} ASC,
143+
{{ order_by_eff }} ASC
144+
{%- else %}
145+
ORDER BY {{ order_by }} ASC
146+
{%- endif %}
147+
)
108148
{%- endif %}
109-
)
110149
),
111150

112151
{%- if enable_ghost_record %}

0 commit comments

Comments
 (0)