Skip to content

Commit 159b234

Browse files
Add hard_deletes config and new_record Option for Snapshots (#317)
1 parent d56ca34 commit 159b234

File tree

6 files changed

+344
-12
lines changed

6 files changed

+344
-12
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
kind: Features
2+
body: Add new hard_deletes="new_record" mode for snapshots.
3+
time: 2024-11-04T12:06:53.225939-05:00
4+
custom:
5+
Author: peterallenwebb
6+
Issue: "317"
Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
import pytest
2+
3+
from dbt.tests.util import check_relations_equal, run_dbt
4+
5+
_seed_new_record_mode = """
6+
create table {database}.{schema}.seed (
7+
id INTEGER,
8+
first_name VARCHAR(50),
9+
last_name VARCHAR(50),
10+
email VARCHAR(50),
11+
gender VARCHAR(50),
12+
ip_address VARCHAR(20),
13+
updated_at TIMESTAMP WITHOUT TIME ZONE
14+
);
15+
16+
create table {database}.{schema}.snapshot_expected (
17+
id INTEGER,
18+
first_name VARCHAR(50),
19+
last_name VARCHAR(50),
20+
email VARCHAR(50),
21+
gender VARCHAR(50),
22+
ip_address VARCHAR(20),
23+
24+
-- snapshotting fields
25+
updated_at TIMESTAMP WITHOUT TIME ZONE,
26+
dbt_valid_from TIMESTAMP WITHOUT TIME ZONE,
27+
dbt_valid_to TIMESTAMP WITHOUT TIME ZONE,
28+
dbt_scd_id TEXT,
29+
dbt_updated_at TIMESTAMP WITHOUT TIME ZONE,
30+
dbt_is_deleted TEXT
31+
);
32+
33+
34+
-- seed inserts
35+
-- use the same email for two users to verify that duplicated check_cols values
36+
-- are handled appropriately
37+
insert into {database}.{schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values
38+
(1, 'Judith', 'Kennedy', '(not provided)', 'Female', '54.60.24.128', '2015-12-24 12:19:28'),
39+
(2, 'Arthur', 'Kelly', '(not provided)', 'Male', '62.56.24.215', '2015-10-28 16:22:15'),
40+
(3, 'Rachel', 'Moreno', 'rmoreno2@msu.edu', 'Female', '31.222.249.23', '2016-04-05 02:05:30'),
41+
(4, 'Ralph', 'Turner', 'rturner3@hp.com', 'Male', '157.83.76.114', '2016-08-08 00:06:51'),
42+
(5, 'Laura', 'Gonzales', 'lgonzales4@howstuffworks.com', 'Female', '30.54.105.168', '2016-09-01 08:25:38'),
43+
(6, 'Katherine', 'Lopez', 'klopez5@yahoo.co.jp', 'Female', '169.138.46.89', '2016-08-30 18:52:11'),
44+
(7, 'Jeremy', 'Hamilton', 'jhamilton6@mozilla.org', 'Male', '231.189.13.133', '2016-07-17 02:09:46'),
45+
(8, 'Heather', 'Rose', 'hrose7@goodreads.com', 'Female', '87.165.201.65', '2015-12-29 22:03:56'),
46+
(9, 'Gregory', 'Kelly', 'gkelly8@trellian.com', 'Male', '154.209.99.7', '2016-03-24 21:18:16'),
47+
(10, 'Rachel', 'Lopez', 'rlopez9@themeforest.net', 'Female', '237.165.82.71', '2016-08-20 15:44:49'),
48+
(11, 'Donna', 'Welch', 'dwelcha@shutterfly.com', 'Female', '103.33.110.138', '2016-02-27 01:41:48'),
49+
(12, 'Russell', 'Lawrence', 'rlawrenceb@qq.com', 'Male', '189.115.73.4', '2016-06-11 03:07:09'),
50+
(13, 'Michelle', 'Montgomery', 'mmontgomeryc@scientificamerican.com', 'Female', '243.220.95.82', '2016-06-18 16:27:19'),
51+
(14, 'Walter', 'Castillo', 'wcastillod@pagesperso-orange.fr', 'Male', '71.159.238.196', '2016-10-06 01:55:44'),
52+
(15, 'Robin', 'Mills', 'rmillse@vkontakte.ru', 'Female', '172.190.5.50', '2016-10-31 11:41:21'),
53+
(16, 'Raymond', 'Holmes', 'rholmesf@usgs.gov', 'Male', '148.153.166.95', '2016-10-03 08:16:38'),
54+
(17, 'Gary', 'Bishop', 'gbishopg@plala.or.jp', 'Male', '161.108.182.13', '2016-08-29 19:35:20'),
55+
(18, 'Anna', 'Riley', 'arileyh@nasa.gov', 'Female', '253.31.108.22', '2015-12-11 04:34:27'),
56+
(19, 'Sarah', 'Knight', 'sknighti@foxnews.com', 'Female', '222.220.3.177', '2016-09-26 00:49:06'),
57+
(20, 'Phyllis', 'Fox', null, 'Female', '163.191.232.95', '2016-08-21 10:35:19');
58+
59+
60+
-- populate snapshot table
61+
insert into {database}.{schema}.snapshot_expected (
62+
id,
63+
first_name,
64+
last_name,
65+
email,
66+
gender,
67+
ip_address,
68+
updated_at,
69+
dbt_valid_from,
70+
dbt_valid_to,
71+
dbt_updated_at,
72+
dbt_scd_id,
73+
dbt_is_deleted
74+
)
75+
76+
select
77+
id,
78+
first_name,
79+
last_name,
80+
email,
81+
gender,
82+
ip_address,
83+
updated_at,
84+
-- fields added by snapshotting
85+
updated_at as dbt_valid_from,
86+
null::timestamp as dbt_valid_to,
87+
updated_at as dbt_updated_at,
88+
md5(id || '-' || first_name || '|' || updated_at::text) as dbt_scd_id,
89+
'False' as dbt_is_deleted
90+
from {database}.{schema}.seed;
91+
"""
92+
93+
_snapshot_actual_sql = """
94+
{% snapshot snapshot_actual %}
95+
96+
{{
97+
config(
98+
unique_key='id || ' ~ "'-'" ~ ' || first_name',
99+
)
100+
}}
101+
102+
select * from {{target.database}}.{{target.schema}}.seed
103+
104+
{% endsnapshot %}
105+
"""
106+
107+
_snapshots_yml = """
108+
snapshots:
109+
- name: snapshot_actual
110+
config:
111+
strategy: timestamp
112+
updated_at: updated_at
113+
hard_deletes: new_record
114+
"""
115+
116+
_ref_snapshot_sql = """
117+
select * from {{ ref('snapshot_actual') }}
118+
"""
119+
120+
121+
_invalidate_sql = """
122+
-- update records 11 - 21. Change email and updated_at field
123+
update {schema}.seed set
124+
updated_at = updated_at + interval '1 hour',
125+
email = case when id = 20 then 'pfoxj@creativecommons.org' else 'new_' || email end
126+
where id >= 10 and id <= 20;
127+
128+
129+
-- invalidate records 11 - 21
130+
update {schema}.snapshot_expected set
131+
dbt_valid_to = updated_at + interval '1 hour'
132+
where id >= 10 and id <= 20;
133+
134+
"""
135+
136+
_update_sql = """
137+
-- insert v2 of the 11 - 21 records
138+
139+
insert into {database}.{schema}.snapshot_expected (
140+
id,
141+
first_name,
142+
last_name,
143+
email,
144+
gender,
145+
ip_address,
146+
updated_at,
147+
dbt_valid_from,
148+
dbt_valid_to,
149+
dbt_updated_at,
150+
dbt_scd_id,
151+
dbt_is_deleted
152+
)
153+
154+
select
155+
id,
156+
first_name,
157+
last_name,
158+
email,
159+
gender,
160+
ip_address,
161+
updated_at,
162+
-- fields added by snapshotting
163+
updated_at as dbt_valid_from,
164+
null::timestamp as dbt_valid_to,
165+
updated_at as dbt_updated_at,
166+
md5(id || '-' || first_name || '|' || updated_at::text) as dbt_scd_id,
167+
'False' as dbt_is_deleted
168+
from {database}.{schema}.seed
169+
where id >= 10 and id <= 20;
170+
"""
171+
172+
_delete_sql = """
173+
delete from {schema}.seed where id = 1
174+
"""
175+
176+
177+
class SnapshotNewRecordMode:
178+
@pytest.fixture(scope="class")
179+
def snapshots(self):
180+
return {"snapshot.sql": _snapshot_actual_sql}
181+
182+
@pytest.fixture(scope="class")
183+
def models(self):
184+
return {
185+
"snapshots.yml": _snapshots_yml,
186+
"ref_snapshot.sql": _ref_snapshot_sql,
187+
}
188+
189+
@pytest.fixture(scope="class")
190+
def seed_new_record_mode(self):
191+
return _seed_new_record_mode
192+
193+
@pytest.fixture(scope="class")
194+
def invalidate_sql(self):
195+
return _invalidate_sql
196+
197+
@pytest.fixture(scope="class")
198+
def update_sql(self):
199+
return _update_sql
200+
201+
@pytest.fixture(scope="class")
202+
def delete_sql(self):
203+
return _delete_sql
204+
205+
def test_snapshot_new_record_mode(
206+
self, project, seed_new_record_mode, invalidate_sql, update_sql
207+
):
208+
project.run_sql(seed_new_record_mode)
209+
results = run_dbt(["snapshot"])
210+
assert len(results) == 1
211+
212+
project.run_sql(invalidate_sql)
213+
project.run_sql(update_sql)
214+
215+
results = run_dbt(["snapshot"])
216+
assert len(results) == 1
217+
218+
check_relations_equal(project.adapter, ["snapshot_actual", "snapshot_expected"])
219+
220+
project.run_sql(_delete_sql)
221+
222+
results = run_dbt(["snapshot"])
223+
assert len(results) == 1
224+
225+
# TODO: Further validate results.

dbt/adapters/base/impl.py

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,14 @@ class FreshnessResponse(TypedDict):
206206
age: float # age in seconds
207207

208208

209+
class SnapshotStrategy(TypedDict):
210+
unique_key: Optional[str]
211+
updated_at: Optional[str]
212+
row_changed: Optional[str]
213+
scd_id: Optional[str]
214+
hard_deletes: Optional[str]
215+
216+
209217
class BaseAdapter(metaclass=AdapterMeta):
210218
"""The BaseAdapter provides an abstract base class for adapters.
211219
@@ -795,8 +803,8 @@ def valid_snapshot_target(
795803
columns = self.get_columns_in_relation(relation)
796804
names = set(c.name.lower() for c in columns)
797805
missing = []
798-
# Note: we're not checking dbt_updated_at here because it's not
799-
# always present.
806+
# Note: we're not checking dbt_updated_at or dbt_is_deleted here because they
807+
# aren't always present.
800808
for column in ("dbt_scd_id", "dbt_valid_from", "dbt_valid_to"):
801809
desired = column_names[column] if column_names else column
802810
if desired not in names:
@@ -805,6 +813,28 @@ def valid_snapshot_target(
805813
if missing:
806814
raise SnapshotTargetNotSnapshotTableError(missing)
807815

816+
@available.parse_none
817+
def assert_valid_snapshot_target_given_strategy(
818+
self, relation: BaseRelation, column_names: Dict[str, str], strategy: SnapshotStrategy
819+
) -> None:
820+
# Assert everything we can with the legacy function.
821+
self.valid_snapshot_target(relation, column_names)
822+
823+
# Now do strategy-specific checks.
824+
# TODO: Make these checks more comprehensive.
825+
if strategy.get("hard_deletes", None) == "new_record":
826+
columns = self.get_columns_in_relation(relation)
827+
names = set(c.name.lower() for c in columns)
828+
missing = []
829+
830+
for column in ("dbt_is_deleted",):
831+
desired = column_names[column] if column_names else column
832+
if desired not in names:
833+
missing.append(desired)
834+
835+
if missing:
836+
raise SnapshotTargetNotSnapshotTableError(missing)
837+
808838
@available.parse_none
809839
def expand_target_column_types(
810840
self, from_relation: BaseRelation, to_relation: BaseRelation
@@ -1795,6 +1825,29 @@ def _get_adapter_specific_run_info(cls, config) -> Dict[str, Any]:
17951825
"""
17961826
return {}
17971827

1828+
@available.parse_none
1829+
@classmethod
1830+
def get_hard_deletes_behavior(cls, config):
1831+
"""Check the hard_deletes config enum, and the legacy invalidate_hard_deletes
1832+
config flag in order to determine which behavior should be used for deleted
1833+
records in a snapshot. The default is to ignore them."""
1834+
invalidate_hard_deletes = config.get("invalidate_hard_deletes", None)
1835+
hard_deletes = config.get("hard_deletes", None)
1836+
1837+
if invalidate_hard_deletes is not None and hard_deletes is not None:
1838+
raise DbtValidationError(
1839+
"You cannot set both the invalidate_hard_deletes and hard_deletes config properties on the same snapshot."
1840+
)
1841+
1842+
if invalidate_hard_deletes or hard_deletes == "invalidate":
1843+
return "invalidate"
1844+
elif hard_deletes == "new_record":
1845+
return "new_record"
1846+
elif hard_deletes is None or hard_deletes == "ignore":
1847+
return "ignore"
1848+
1849+
raise DbtValidationError("Invalid setting for property hard_deletes.")
1850+
17981851

17991852
COLUMNS_EQUAL_SQL = """
18001853
with diff_count as (

0 commit comments

Comments
 (0)