Skip to content

Commit ea25a89

Browse files
Lita Chogarrettheel
authored andcommitted
Add consistent_read parameter to Model.scan (#311)
1 parent acad93d commit ea25a89

File tree

8 files changed

+61
-14
lines changed

8 files changed

+61
-14
lines changed

pynamodb/connection/base.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -982,7 +982,8 @@ def rate_limited_scan(self,
982982
read_capacity_to_consume_per_second=10,
983983
allow_rate_limited_scan_without_consumed_capacity=None,
984984
max_sleep_between_retry=10,
985-
max_consecutive_exceptions=10):
985+
max_consecutive_exceptions=10,
986+
consistent_read=None):
986987
"""
987988
Performs a rate limited scan on the table. The API uses the scan API to fetch items from
988989
DynamoDB. The rate_limited_scan uses the 'ConsumedCapacity' value returned from DynamoDB to
@@ -1007,6 +1008,7 @@ def rate_limited_scan(self,
10071008
throttling/rate limit scenarios
10081009
:param max_consecutive_exceptions: Max number of consecutive ProvisionedThroughputExceededException
10091010
exception for scan to exit
1011+
:param consistent_read: enable consistent read
10101012
"""
10111013
read_capacity_to_consume_per_ms = float(read_capacity_to_consume_per_second) / 1000
10121014
if allow_rate_limited_scan_without_consumed_capacity is None:
@@ -1038,7 +1040,8 @@ def rate_limited_scan(self,
10381040
return_consumed_capacity=TOTAL,
10391041
scan_filter=scan_filter,
10401042
segment=segment,
1041-
total_segments=total_segments
1043+
total_segments=total_segments,
1044+
consistent_read=consistent_read
10421045
)
10431046
for item in data.get(ITEMS):
10441047
yield item
@@ -1120,7 +1123,8 @@ def scan(self,
11201123
return_consumed_capacity=None,
11211124
exclusive_start_key=None,
11221125
segment=None,
1123-
total_segments=None):
1126+
total_segments=None,
1127+
consistent_read=None):
11241128
"""
11251129
Performs the scan operation
11261130
"""
@@ -1154,6 +1158,8 @@ def scan(self,
11541158
operation_kwargs[SCAN_FILTER][key][ATTR_VALUE_LIST] = values
11551159
if conditional_operator:
11561160
operation_kwargs.update(self.get_conditional_operator(conditional_operator))
1161+
if consistent_read:
1162+
operation_kwargs[CONSISTENT_READ] = consistent_read
11571163
try:
11581164
return self.dispatch(SCAN, operation_kwargs)
11591165
except BOTOCORE_EXCEPTIONS as e:

pynamodb/connection/table.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,8 @@ def rate_limited_scan(self,
144144
read_capacity_to_consume_per_second=None,
145145
allow_rate_limited_scan_without_consumed_capacity=None,
146146
max_sleep_between_retry=None,
147-
max_consecutive_exceptions=None):
147+
max_consecutive_exceptions=None,
148+
consistent_read=None):
148149
"""
149150
Performs the scan operation with rate limited
150151
"""
@@ -162,7 +163,8 @@ def rate_limited_scan(self,
162163
read_capacity_to_consume_per_second=read_capacity_to_consume_per_second,
163164
allow_rate_limited_scan_without_consumed_capacity=allow_rate_limited_scan_without_consumed_capacity,
164165
max_sleep_between_retry=max_sleep_between_retry,
165-
max_consecutive_exceptions=max_consecutive_exceptions)
166+
max_consecutive_exceptions=max_consecutive_exceptions,
167+
consistent_read=consistent_read)
166168

167169
def scan(self,
168170
attributes_to_get=None,
@@ -172,7 +174,8 @@ def scan(self,
172174
return_consumed_capacity=None,
173175
segment=None,
174176
total_segments=None,
175-
exclusive_start_key=None):
177+
exclusive_start_key=None,
178+
consistent_read=None):
176179
"""
177180
Performs the scan operation
178181
"""
@@ -185,7 +188,8 @@ def scan(self,
185188
return_consumed_capacity=return_consumed_capacity,
186189
segment=segment,
187190
total_segments=total_segments,
188-
exclusive_start_key=exclusive_start_key)
191+
exclusive_start_key=exclusive_start_key,
192+
consistent_read=consistent_read)
189193

190194
def query(self,
191195
hash_key,

pynamodb/models.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -675,6 +675,7 @@ def rate_limited_scan(cls,
675675
allow_rate_limited_scan_without_consumed_capacity=None,
676676
max_sleep_between_retry=10,
677677
max_consecutive_exceptions=30,
678+
consistent_read=None,
678679
**filters):
679680
"""
680681
Scans the items in the table at a definite rate.
@@ -698,6 +699,7 @@ def rate_limited_scan(cls,
698699
throttling/rate limit scenarios
699700
:param max_consecutive_exceptions: Max number of consecutive provision throughput exceeded
700701
exceptions for scan to exit
702+
:param consistent_read: If True, a consistent read is performed
701703
"""
702704

703705
cls._conditional_operator_check(conditional_operator)
@@ -723,6 +725,7 @@ def rate_limited_scan(cls,
723725
allow_rate_limited_scan_without_consumed_capacity=allow_rate_limited_scan_without_consumed_capacity,
724726
max_sleep_between_retry=max_sleep_between_retry,
725727
max_consecutive_exceptions=max_consecutive_exceptions,
728+
consistent_read=consistent_read,
726729
)
727730

728731
for item in scan_result:
@@ -736,6 +739,7 @@ def scan(cls,
736739
conditional_operator=None,
737740
last_evaluated_key=None,
738741
page_size=None,
742+
consistent_read=None,
739743
**filters):
740744
"""
741745
Iterates through all items in the table
@@ -747,6 +751,7 @@ def scan(cls,
747751
:param last_evaluated_key: If set, provides the starting point for scan.
748752
:param page_size: Page size of the scan to DynamoDB
749753
:param filters: A list of item filters
754+
:param consistent_read: If True, a consistent read is performed
750755
"""
751756
cls._conditional_operator_check(conditional_operator)
752757
key_filter, scan_filter = cls._build_filters(
@@ -765,7 +770,8 @@ def scan(cls,
765770
limit=page_size,
766771
scan_filter=key_filter,
767772
total_segments=total_segments,
768-
conditional_operator=conditional_operator
773+
conditional_operator=conditional_operator,
774+
consistent_read=consistent_read
769775
)
770776
log.debug("Fetching first scan page")
771777
last_evaluated_key = data.get(LAST_EVALUATED_KEY, None)

pynamodb/tests/test_base_connection.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1606,6 +1606,23 @@ def test_scan(self):
16061606
req.return_value = DESCRIBE_TABLE_DATA
16071607
conn.describe_table(table_name)
16081608

1609+
with patch(PATCH_METHOD) as req:
1610+
req.return_value = {}
1611+
conn.scan(
1612+
table_name,
1613+
segment=0,
1614+
total_segments=22,
1615+
consistent_read=True
1616+
)
1617+
params = {
1618+
'ReturnConsumedCapacity': 'TOTAL',
1619+
'TableName': table_name,
1620+
'Segment': 0,
1621+
'TotalSegments': 22,
1622+
'ConsistentRead': True
1623+
}
1624+
self.assertDictEqual(req.call_args[0][1], params)
1625+
16091626
with patch(PATCH_METHOD) as req:
16101627
req.return_value = {}
16111628
conn.scan(

pynamodb/tests/test_model.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1906,6 +1906,16 @@ def fake_scan(*args):
19061906
self.assertEquals(req.mock_calls[0][1][1]['Limit'], 4)
19071907
self.assertEqual(count, 4)
19081908

1909+
with patch(PATCH_METHOD, new=mock_scan) as req:
1910+
count = 0
1911+
for item in UserModel.scan(limit=4, consistent_read=True):
1912+
count += 1
1913+
self.assertIsNotNone(item)
1914+
self.assertEqual(len(req.mock_calls), 2)
1915+
self.assertEquals(req.mock_calls[1][1][1]['Limit'], 4)
1916+
self.assertEquals(req.mock_calls[1][1][1]['ConsistentRead'], True)
1917+
self.assertEqual(count, 4)
1918+
19091919
with patch(PATCH_METHOD) as req:
19101920
items = []
19111921
for idx in range(10):
@@ -1997,7 +2007,8 @@ def test_rate_limited_scan(self):
19972007
allow_rate_limited_scan_without_consumed_capacity=False,
19982008
max_sleep_between_retry=4,
19992009
max_consecutive_exceptions=22,
2000-
attributes_to_get=['X1', 'X2']
2010+
attributes_to_get=['X1', 'X2'],
2011+
consistent_read=True
20012012
)
20022013
self.assertEqual(1, len(list(result)))
20032014
self.assertEqual('UserModel', req.call_args[0][0])
@@ -2014,7 +2025,8 @@ def test_rate_limited_scan(self):
20142025
'read_capacity_to_consume_per_second': 33,
20152026
'allow_rate_limited_scan_without_consumed_capacity': False,
20162027
'max_sleep_between_retry': 4,
2017-
'max_consecutive_exceptions': 22
2028+
'max_consecutive_exceptions': 22,
2029+
'consistent_read': True
20182030
}
20192031
self.assertEqual(params, req.call_args[1])
20202032

pynamodb/tests/test_table_connection.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -482,7 +482,8 @@ def test_rate_limited_scan(self):
482482
read_capacity_to_consume_per_second=12,
483483
allow_rate_limited_scan_without_consumed_capacity=False,
484484
max_sleep_between_retry=3,
485-
max_consecutive_exceptions=7
485+
max_consecutive_exceptions=7,
486+
consistent_read=True
486487
)
487488
self.assertEqual(self.test_table_name, req.call_args[0][0])
488489
params = {
@@ -498,6 +499,7 @@ def test_rate_limited_scan(self):
498499
'read_capacity_to_consume_per_second': 12,
499500
'allow_rate_limited_scan_without_consumed_capacity': False,
500501
'max_sleep_between_retry': 3,
501-
'max_consecutive_exceptions': 7
502+
'max_consecutive_exceptions': 7,
503+
'consistent_read': True
502504
}
503505
self.assertEqual(params, req.call_args[1])

requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
botocore==1.1.2
1+
botocore==1.2.0
22
six==1.9.0

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
install_requires = [
1515
'six',
16-
'botocore>=1.0.0',
16+
'botocore>=1.2.0',
1717
'python-dateutil>=2.1,<3.0.0',
1818
]
1919

0 commit comments

Comments
 (0)