1
+ {%- macro bigquery__eff_sat(src_pk, src_dfk, src_sfk, src_start_date, src_end_date, src_eff, src_ldts, src_source, source_model) - %}
2
+
3
+ {{- dbtvault .check_required_parameters (src_pk= src_pk, src_dfk= src_dfk, src_sfk= src_sfk,
4
+ src_start_date= src_start_date, src_end_date= src_end_date,
5
+ src_eff= src_eff, src_ldts= src_ldts, src_source= src_source,
6
+ source_model= source_model) - }}
7
+
8
+ {%- set src_pk = dbtvault .escape_column_names (src_pk) - %}
9
+ {%- set src_dfk = dbtvault .escape_column_names (src_dfk) - %}
10
+ {%- set src_sfk = dbtvault .escape_column_names (src_sfk) - %}
11
+ {%- set src_start_date = dbtvault .escape_column_names (src_start_date) - %}
12
+ {%- set src_end_date = dbtvault .escape_column_names (src_end_date) - %}
13
+ {%- set src_eff = dbtvault .escape_column_names (src_eff) - %}
14
+ {%- set src_ldts = dbtvault .escape_column_names (src_ldts) - %}
15
+ {%- set src_source = dbtvault .escape_column_names (src_source) - %}
16
+
17
+ {%- set source_cols = dbtvault .expand_column_list (columns= [src_pk, src_dfk, src_sfk, src_start_date, src_end_date, src_eff, src_ldts, src_source]) - %}
18
+ {%- set fk_cols = dbtvault .expand_column_list (columns= [src_dfk, src_sfk]) - %}
19
+ {%- set dfk_cols = dbtvault .expand_column_list (columns= [src_dfk]) - %}
20
+ {%- set is_auto_end_dating = config .get (' is_auto_end_dating' , default= false) %}
21
+
22
+ {%- set max_datetime = var(' max_datetime' , ' 9999-12-31 23:59:59.999999' ) %}
23
+
24
+ {{- dbtvault .prepend_generated_by () }}
25
+
26
+ WITH source_data AS (
27
+ SELECT {{ dbtvault .prefix (source_cols, ' a' , alias_target= ' source' ) }}
28
+ FROM {{ ref(source_model) }} AS a
29
+ WHERE {{ dbtvault .multikey (src_dfk, prefix= ' a' , condition= ' IS NOT NULL' ) }}
30
+ AND {{ dbtvault .multikey (src_sfk, prefix= ' a' , condition= ' IS NOT NULL' ) }}
31
+ {%- if model .config .materialized == ' vault_insert_by_period' %}
32
+ AND __PERIOD_FILTER__
33
+ {%- elif model .config .materialized == ' vault_insert_by_rank' %}
34
+ AND __RANK_FILTER__
35
+ {%- endif %}
36
+ ),
37
+
38
+ {%- if dbtvault .is_any_incremental () %}
39
+
40
+ {# Selecting the most recent records for each link hashkey -#}
41
+ latest_records_unranked AS (
42
+ SELECT {{ dbtvault .alias_all (source_cols, ' b' ) }},
43
+ ROW_NUMBER() OVER (
44
+ PARTITION BY {{ dbtvault .prefix ([src_pk], ' b' ) }}
45
+ ORDER BY b.{{ src_ldts }} DESC
46
+ ) AS row_num
47
+ FROM {{ this }} AS b
48
+ ),
49
+
50
+ latest_records AS (
51
+ SELECT *
52
+ FROM latest_records_unranked
53
+ WHERE row_num = 1
54
+ ),
55
+
56
+ {# Selecting the open records of the most recent records for each link hashkey -#}
57
+ latest_open AS (
58
+ SELECT {{ dbtvault .alias_all (source_cols, ' c' ) }}
59
+ FROM latest_records AS c
60
+ WHERE DATE (c.{{ src_end_date }}) = CAST(PARSE_DATETIME(' %F %H:%M:%E6S' , ' {{ max_datetime }}' ) AS DATE )
61
+ ),
62
+
63
+ {# Selecting the closed records of the most recent records for each link hashkey -#}
64
+ latest_closed AS (
65
+ SELECT {{ dbtvault .alias_all (source_cols, ' d' ) }}
66
+ FROM latest_records AS d
67
+ WHERE DATE (d.{{ src_end_date }}) != CAST(PARSE_DATETIME(' %F %H:%M:%E6S' , ' {{ max_datetime }}' ) AS DATE )
68
+ ),
69
+
70
+ {# Identifying the completely new link relationships to be opened in eff sat -#}
71
+ new_open_records AS (
72
+ SELECT DISTINCT
73
+ {{ dbtvault .alias_all (source_cols, ' f' ) }}
74
+ FROM source_data AS f
75
+ LEFT JOIN latest_records AS lr
76
+ ON {{ dbtvault .multikey (src_pk, prefix= [' f' ,' lr' ], condition= ' =' ) }}
77
+ WHERE {{ dbtvault .multikey (src_pk, prefix= ' lr' , condition= ' IS NULL' ) }}
78
+ ),
79
+
80
+ {# Identifying the currently closed link relationships to be reopened in eff sat -#}
81
+ new_reopened_records AS (
82
+ SELECT DISTINCT
83
+ {{ dbtvault .prefix ([src_pk], ' lc' ) }},
84
+ {{ dbtvault .alias_all (fk_cols, ' lc' ) }},
85
+ lc.{{ src_start_date }} AS {{ src_start_date }},
86
+ g.{{ src_end_date }} AS {{ src_end_date }},
87
+ g.{{ src_eff }} AS {{ src_eff }},
88
+ g.{{ src_ldts }},
89
+ g.{{ src_source }}
90
+ FROM source_data AS g
91
+ INNER JOIN latest_closed AS lc
92
+ ON {{ dbtvault .multikey (src_pk, prefix= [' g' ,' lc' ], condition= ' =' ) }}
93
+ WHERE CAST((g.{{ src_end_date }}) AS DATE ) = CAST(PARSE_DATETIME(' %F %H:%M:%E6S' , ' {{ max_datetime }}' ) AS DATE )
94
+ ),
95
+
96
+ {%- if is_auto_end_dating %}
97
+
98
+ {# Creating the closing records -#}
99
+ {# Identifying the currently open relationships that need to be closed due to change in SFK(s) -#}
100
+ new_closed_records AS (
101
+ SELECT DISTINCT
102
+ {{ dbtvault .prefix ([src_pk], ' lo' ) }},
103
+ {{ dbtvault .alias_all (fk_cols, ' lo' ) }},
104
+ lo.{{ src_start_date }} AS {{ src_start_date }},
105
+ h.{{ src_eff }} AS {{ src_end_date }},
106
+ h.{{ src_eff }} AS {{ src_eff }},
107
+ h.{{ src_ldts }},
108
+ lo.{{ src_source }}
109
+ FROM source_data AS h
110
+ INNER JOIN latest_open AS lo
111
+ ON {{ dbtvault .multikey (src_dfk, prefix= [' lo' , ' h' ], condition= ' =' ) }}
112
+ WHERE ({{ dbtvault .multikey (src_sfk, prefix= [' lo' , ' h' ], condition= ' <>' , operator= ' OR' ) }})
113
+ ),
114
+
115
+ {# - else if is_auto_end_dating -#}
116
+ {% else %}
117
+
118
+ new_closed_records AS (
119
+ SELECT DISTINCT
120
+ lo.{{ src_pk }},
121
+ {{ dbtvault .alias_all (fk_cols, ' lo' ) }},
122
+ lo.{{ src_start_date }} AS {{ src_start_date }},
123
+ h.{{ src_eff }} AS {{ src_end_date }},
124
+ h.{{ src_eff }} AS {{ src_eff }},
125
+ h.{{ src_ldts }},
126
+ lo.{{ src_source }}
127
+ FROM source_data AS h
128
+ LEFT JOIN Latest_open AS lo
129
+ ON lo.{{ src_pk }} = h.{{ src_pk }}
130
+ LEFT JOIN latest_closed AS lc
131
+ ON lc.{{ src_pk }} = h.{{ src_pk }}
132
+ WHERE CAST((h.{{ src_end_date }}) AS DATE ) != CAST(PARSE_DATETIME(' %F %H:%M:%E6S' , ' {{ max_datetime }}' ) AS DATE )
133
+ AND lo.{{ src_pk }} IS NOT NULL
134
+ AND lc.{{ src_pk }} IS NULL
135
+ ),
136
+
137
+ {# - end if is_auto_end_dating -#}
138
+ {%- endif %}
139
+
140
+ records_to_insert AS (
141
+ SELECT * FROM new_open_records
142
+ UNION DISTINCT
143
+ SELECT * FROM new_reopened_records
144
+ UNION DISTINCT
145
+ SELECT * FROM new_closed_records
146
+ )
147
+
148
+ {# - else if not dbtvault.is_any_incremental() -#}
149
+ {%- else %}
150
+
151
+ records_to_insert AS (
152
+ SELECT {{ dbtvault .alias_all (source_cols, ' i' ) }}
153
+ FROM source_data AS i
154
+ )
155
+
156
+ {# - end if not dbtvault.is_any_incremental() -#}
157
+ {%- endif %}
158
+
159
+ SELECT *
160
+ FROM records_to_insert
161
+
162
+ {%- endmacro - %}
0 commit comments