@@ -28,153 +28,167 @@ def _transaction():
28
28
return _transaction
29
29
30
30
31
- @pytest .mark .parametrize (
32
- ["set_timestamp" , "get_timestamp" , "expected" ],
33
- [
34
- pytest .param (10 , 10 , "value" , id = "set_timestamp_equal_to_get_timestamp" ),
35
- pytest .param (10 , 11 , "value" , id = "set_timestamp_less_than_get_timestamp" ),
36
- pytest .param (10 , 9 , None , id = "set_timestamp_greater_than_get_timestamp" ),
37
- ],
38
- )
39
- def test_get_last_from_cache (
40
- transaction : TimestampedPartitionTransaction ,
41
- set_timestamp : int ,
42
- get_timestamp : int ,
43
- expected : Any ,
44
- ):
45
- with transaction () as tx :
46
- tx .set_for_timestamp (timestamp = set_timestamp , value = "value" , prefix = b"key" )
47
- assert tx .get_last (timestamp = get_timestamp , prefix = b"key" ) == expected
48
-
49
-
50
- @pytest .mark .parametrize (
51
- ["set_timestamp" , "get_timestamp" , "expected" ],
52
- [
53
- pytest .param (10 , 10 , "value" , id = "set_timestamp_equal_to_get_timestamp" ),
54
- pytest .param (10 , 11 , "value" , id = "set_timestamp_less_than_get_timestamp" ),
55
- pytest .param (10 , 9 , None , id = "set_timestamp_greater_than_get_timestamp" ),
56
- ],
57
- )
58
- def test_get_last_from_store (
59
- transaction : TimestampedPartitionTransaction ,
60
- set_timestamp : int ,
61
- get_timestamp : int ,
62
- expected : Any ,
63
- ):
64
- with transaction () as tx :
65
- tx .set_for_timestamp (timestamp = set_timestamp , value = "value" , prefix = b"key" )
66
-
67
- with transaction () as tx :
68
- assert tx .get_last (timestamp = get_timestamp , prefix = b"key" ) == expected
69
-
70
-
71
- @pytest .mark .parametrize (
72
- ["set_timestamp_stored" , "set_timestamp_cached" , "get_timestamp" , "expected" ],
73
- [
74
- pytest .param (3 , 2 , 5 , "stored" , id = "stored-greater-than-cached" ),
75
- pytest .param (2 , 3 , 5 , "cached" , id = "cached-greater-than-stored" ),
76
- ],
77
- )
78
- def test_get_last_returns_value_for_greater_timestamp (
79
- transaction : TimestampedPartitionTransaction ,
80
- set_timestamp_stored : int ,
81
- set_timestamp_cached : int ,
82
- get_timestamp : int ,
83
- expected : Any ,
84
- ):
85
- with transaction () as tx :
86
- tx .set_for_timestamp (
87
- timestamp = set_timestamp_stored , value = "stored" , prefix = b"key"
88
- )
89
-
90
- with transaction () as tx :
91
- tx .set_for_timestamp (
92
- timestamp = set_timestamp_cached , value = "cached" , prefix = b"key"
93
- )
94
- assert tx .get_last (timestamp = get_timestamp , prefix = b"key" ) == expected
95
-
96
-
97
- def test_get_last_prefix_not_bytes (transaction : TimestampedPartitionTransaction ):
98
- with transaction () as tx :
99
- tx .set_for_timestamp (timestamp = 10 , value = "value" , prefix = "key" )
100
- assert tx .get_last (timestamp = 10 , prefix = "key" ) == "value"
101
- assert tx .get_last (timestamp = 10 , prefix = b'"key"' ) == "value"
102
-
103
-
104
- def test_get_last_for_out_of_order_timestamp (
105
- transaction : TimestampedPartitionTransaction ,
106
- ):
107
- with transaction () as tx :
108
- tx .set_for_timestamp (
109
- timestamp = 10 , value = "value10" , prefix = b"key" , retention_ms = 5
110
- )
111
- assert tx .get_last (timestamp = 10 , prefix = b"key" ) == "value10"
112
- tx .set_for_timestamp (timestamp = 5 , value = "value5" , prefix = b"key" , retention_ms = 5 )
113
- tx .set_for_timestamp (timestamp = 4 , value = "value4" , prefix = b"key" , retention_ms = 5 )
114
-
115
- with transaction () as tx :
116
- assert tx .get_last (timestamp = 5 , prefix = b"key" ) == "value5"
117
-
118
- # Retention watermark is 10 - 5 = 5 so everything lower is ignored
119
- assert tx .get_last (timestamp = 4 , prefix = b"key" ) is None
120
-
121
-
122
- def test_set_for_timestamp_with_prefix_not_bytes (
123
- transaction : TimestampedPartitionTransaction ,
124
- ):
125
- with transaction () as tx :
126
- tx .set_for_timestamp (timestamp = 10 , value = "value" , prefix = "key" )
127
- assert tx .get_last (timestamp = 10 , prefix = "key" ) == "value"
128
- assert tx .get_last (timestamp = 10 , prefix = b'"key"' ) == "value"
129
-
130
-
131
- def test_set_for_timestamp_with_retention_cached (
132
- transaction : TimestampedPartitionTransaction ,
133
- ):
134
- with transaction () as tx :
135
- tx .set_for_timestamp (timestamp = 2 , value = "v2" , prefix = b"key" , retention_ms = 2 )
136
- tx .set_for_timestamp (timestamp = 5 , value = "v5" , prefix = b"key" , retention_ms = 2 )
137
- assert tx .get_last (timestamp = 2 , prefix = b"key" ) == None
138
- assert tx .get_last (timestamp = 5 , prefix = b"key" ) == "v5"
139
-
140
-
141
- def test_set_for_timestamp_with_retention_stored (
142
- transaction : TimestampedPartitionTransaction ,
143
- ):
144
- with transaction () as tx :
145
- tx .set_for_timestamp (timestamp = 2 , value = "v2" , prefix = b"key" , retention_ms = 2 )
146
- tx .set_for_timestamp (timestamp = 5 , value = "v5" , prefix = b"key" , retention_ms = 2 )
147
-
148
- with transaction () as tx :
149
- assert tx .get_last (timestamp = 2 , prefix = b"key" ) == None
150
- assert tx .get_last (timestamp = 5 , prefix = b"key" ) == "v5"
151
-
152
-
153
- def test_expire_multiple_keys (transaction : TimestampedPartitionTransaction ):
154
- with transaction () as tx :
155
- tx .set_for_timestamp (timestamp = 1 , value = "11" , prefix = b"key1" , retention_ms = 10 )
156
- tx .set_for_timestamp (timestamp = 1 , value = "21" , prefix = b"key2" , retention_ms = 10 )
157
- tx .set_for_timestamp (timestamp = 12 , value = "112" , prefix = b"key1" , retention_ms = 10 )
158
- tx .set_for_timestamp (timestamp = 12 , value = "212" , prefix = b"key2" , retention_ms = 10 )
159
-
160
- with transaction () as tx :
161
- assert tx .get (key = 1 , prefix = b"key1" ) is None
162
- assert tx .get (key = 1 , prefix = b"key2" ) is None
163
- assert tx .get (key = 12 , prefix = b"key1" ) == "112"
164
- assert tx .get (key = 12 , prefix = b"key2" ) == "212"
165
-
166
- # Expiration advances only on `set_for_timestamp` calls
167
- assert tx .get_last (timestamp = 30 , prefix = b"key1" ) == "112"
168
- assert tx .get_last (timestamp = 30 , prefix = b"key2" ) == "212"
169
-
170
-
171
- def test_set_for_timestamp_overwrites_value_with_same_timestamp (
172
- transaction : TimestampedPartitionTransaction ,
173
- ):
174
- with transaction () as tx :
175
- tx .set_for_timestamp (timestamp = 1 , value = "11" , prefix = b"key" )
176
- tx .set_for_timestamp (timestamp = 1 , value = "21" , prefix = b"key" )
177
- assert tx .get_last (timestamp = 1 , prefix = b"key" ) == "21"
178
-
179
- with transaction () as tx :
180
- assert tx .get_last (timestamp = 1 , prefix = b"key" ) == "21"
31
+ class TestTimestampedPartitionTransaction :
32
+ @pytest .mark .parametrize (
33
+ ["set_timestamp" , "get_timestamp" , "expected" ],
34
+ [
35
+ pytest .param (10 , 10 , "value" , id = "set_timestamp_equal_to_get_timestamp" ),
36
+ pytest .param (10 , 11 , "value" , id = "set_timestamp_less_than_get_timestamp" ),
37
+ pytest .param (10 , 9 , None , id = "set_timestamp_greater_than_get_timestamp" ),
38
+ ],
39
+ )
40
+ def test_get_last_from_cache (
41
+ self ,
42
+ transaction : TimestampedPartitionTransaction ,
43
+ set_timestamp : int ,
44
+ get_timestamp : int ,
45
+ expected : Any ,
46
+ ):
47
+ with transaction () as tx :
48
+ tx .set_for_timestamp (timestamp = set_timestamp , value = "value" , prefix = b"key" )
49
+ assert tx .get_last (timestamp = get_timestamp , prefix = b"key" ) == expected
50
+
51
+ @pytest .mark .parametrize (
52
+ ["set_timestamp" , "get_timestamp" , "expected" ],
53
+ [
54
+ pytest .param (10 , 10 , "value" , id = "set_timestamp_equal_to_get_timestamp" ),
55
+ pytest .param (10 , 11 , "value" , id = "set_timestamp_less_than_get_timestamp" ),
56
+ pytest .param (10 , 9 , None , id = "set_timestamp_greater_than_get_timestamp" ),
57
+ ],
58
+ )
59
+ def test_get_last_from_store (
60
+ self ,
61
+ transaction : TimestampedPartitionTransaction ,
62
+ set_timestamp : int ,
63
+ get_timestamp : int ,
64
+ expected : Any ,
65
+ ):
66
+ with transaction () as tx :
67
+ tx .set_for_timestamp (timestamp = set_timestamp , value = "value" , prefix = b"key" )
68
+
69
+ with transaction () as tx :
70
+ assert tx .get_last (timestamp = get_timestamp , prefix = b"key" ) == expected
71
+
72
+ @pytest .mark .parametrize (
73
+ ["set_timestamp_stored" , "set_timestamp_cached" , "get_timestamp" , "expected" ],
74
+ [
75
+ pytest .param (3 , 2 , 5 , "stored" , id = "stored-greater-than-cached" ),
76
+ pytest .param (2 , 3 , 5 , "cached" , id = "cached-greater-than-stored" ),
77
+ ],
78
+ )
79
+ def test_get_last_returns_value_for_greater_timestamp (
80
+ self ,
81
+ transaction : TimestampedPartitionTransaction ,
82
+ set_timestamp_stored : int ,
83
+ set_timestamp_cached : int ,
84
+ get_timestamp : int ,
85
+ expected : Any ,
86
+ ):
87
+ with transaction () as tx :
88
+ tx .set_for_timestamp (
89
+ timestamp = set_timestamp_stored , value = "stored" , prefix = b"key"
90
+ )
91
+
92
+ with transaction () as tx :
93
+ tx .set_for_timestamp (
94
+ timestamp = set_timestamp_cached , value = "cached" , prefix = b"key"
95
+ )
96
+ assert tx .get_last (timestamp = get_timestamp , prefix = b"key" ) == expected
97
+
98
+ def test_get_last_prefix_not_bytes (
99
+ self , transaction : TimestampedPartitionTransaction
100
+ ):
101
+ with transaction () as tx :
102
+ tx .set_for_timestamp (timestamp = 10 , value = "value" , prefix = "key" )
103
+ assert tx .get_last (timestamp = 10 , prefix = "key" ) == "value"
104
+ assert tx .get_last (timestamp = 10 , prefix = b'"key"' ) == "value"
105
+
106
+ def test_get_last_for_out_of_order_timestamp (
107
+ self ,
108
+ transaction : TimestampedPartitionTransaction ,
109
+ ):
110
+ with transaction () as tx :
111
+ tx .set_for_timestamp (
112
+ timestamp = 10 , value = "value10" , prefix = b"key" , retention_ms = 5
113
+ )
114
+ assert tx .get_last (timestamp = 10 , prefix = b"key" ) == "value10"
115
+ tx .set_for_timestamp (
116
+ timestamp = 5 , value = "value5" , prefix = b"key" , retention_ms = 5
117
+ )
118
+ tx .set_for_timestamp (
119
+ timestamp = 4 , value = "value4" , prefix = b"key" , retention_ms = 5
120
+ )
121
+
122
+ with transaction () as tx :
123
+ assert tx .get_last (timestamp = 5 , prefix = b"key" ) == "value5"
124
+
125
+ # Retention watermark is 10 - 5 = 5 so everything lower is ignored
126
+ assert tx .get_last (timestamp = 4 , prefix = b"key" ) is None
127
+
128
+ def test_set_for_timestamp_with_prefix_not_bytes (
129
+ self ,
130
+ transaction : TimestampedPartitionTransaction ,
131
+ ):
132
+ with transaction () as tx :
133
+ tx .set_for_timestamp (timestamp = 10 , value = "value" , prefix = "key" )
134
+ assert tx .get_last (timestamp = 10 , prefix = "key" ) == "value"
135
+ assert tx .get_last (timestamp = 10 , prefix = b'"key"' ) == "value"
136
+
137
+ def test_set_for_timestamp_with_retention_cached (
138
+ self ,
139
+ transaction : TimestampedPartitionTransaction ,
140
+ ):
141
+ with transaction () as tx :
142
+ tx .set_for_timestamp (timestamp = 2 , value = "v2" , prefix = b"key" , retention_ms = 2 )
143
+ tx .set_for_timestamp (timestamp = 5 , value = "v5" , prefix = b"key" , retention_ms = 2 )
144
+ assert tx .get_last (timestamp = 2 , prefix = b"key" ) is None
145
+ assert tx .get_last (timestamp = 5 , prefix = b"key" ) == "v5"
146
+
147
+ def test_set_for_timestamp_with_retention_stored (
148
+ self ,
149
+ transaction : TimestampedPartitionTransaction ,
150
+ ):
151
+ with transaction () as tx :
152
+ tx .set_for_timestamp (timestamp = 2 , value = "v2" , prefix = b"key" , retention_ms = 2 )
153
+ tx .set_for_timestamp (timestamp = 5 , value = "v5" , prefix = b"key" , retention_ms = 2 )
154
+
155
+ with transaction () as tx :
156
+ assert tx .get_last (timestamp = 2 , prefix = b"key" ) is None
157
+ assert tx .get_last (timestamp = 5 , prefix = b"key" ) == "v5"
158
+
159
+ def test_expire_multiple_keys (self , transaction : TimestampedPartitionTransaction ):
160
+ with transaction () as tx :
161
+ tx .set_for_timestamp (
162
+ timestamp = 1 , value = "11" , prefix = b"key1" , retention_ms = 10
163
+ )
164
+ tx .set_for_timestamp (
165
+ timestamp = 1 , value = "21" , prefix = b"key2" , retention_ms = 10
166
+ )
167
+ tx .set_for_timestamp (
168
+ timestamp = 12 , value = "112" , prefix = b"key1" , retention_ms = 10
169
+ )
170
+ tx .set_for_timestamp (
171
+ timestamp = 12 , value = "212" , prefix = b"key2" , retention_ms = 10
172
+ )
173
+
174
+ with transaction () as tx :
175
+ assert tx .get (key = 1 , prefix = b"key1" ) is None
176
+ assert tx .get (key = 1 , prefix = b"key2" ) is None
177
+ assert tx .get (key = 12 , prefix = b"key1" ) == "112"
178
+ assert tx .get (key = 12 , prefix = b"key2" ) == "212"
179
+
180
+ # Expiration advances only on `set_for_timestamp` calls
181
+ assert tx .get_last (timestamp = 30 , prefix = b"key1" ) == "112"
182
+ assert tx .get_last (timestamp = 30 , prefix = b"key2" ) == "212"
183
+
184
+ def test_set_for_timestamp_overwrites_value_with_same_timestamp (
185
+ self ,
186
+ transaction : TimestampedPartitionTransaction ,
187
+ ):
188
+ with transaction () as tx :
189
+ tx .set_for_timestamp (timestamp = 1 , value = "11" , prefix = b"key" )
190
+ tx .set_for_timestamp (timestamp = 1 , value = "21" , prefix = b"key" )
191
+ assert tx .get_last (timestamp = 1 , prefix = b"key" ) == "21"
192
+
193
+ with transaction () as tx :
194
+ assert tx .get_last (timestamp = 1 , prefix = b"key" ) == "21"
0 commit comments