Skip to content

Commit 347b64b

Browse files
edhollandikonst
andcommitted
Implement exponential backoff for batch writes (#728)
The old code failed to respect configured backoff policy, this PR changes it so it will no longer retry in infinite tight loop but instead apply the same backoff/retry policy as the rest of the api * Implement exponential backoff for batch writes * Review fix Update pynamodb/connection/base.py Co-Authored-By: Ilya Konstantinov <ilya.konstantinov@gmail.com> * Add test for throwing PutError when max retries exceeded * Fix format string type Co-Authored-By: Ilya Konstantinov <ilya.konstantinov@gmail.com>
1 parent 07d7e56 commit 347b64b

File tree

3 files changed

+67
-4
lines changed

3 files changed

+67
-4
lines changed

pynamodb/connection/base.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
from pynamodb.types import HASH, RANGE
6262

6363
BOTOCORE_EXCEPTIONS = (BotoCoreError, ClientError)
64+
RATE_LIMITING_ERROR_CODES = ['ProvisionedThroughputExceededException', 'ThrottlingException']
6465

6566
log = logging.getLogger(__name__)
6667
log.addHandler(logging.NullHandler())
@@ -442,7 +443,7 @@ def _make_api_call(self, operation_name, operation_kwargs):
442443
if is_last_attempt_for_exceptions:
443444
log.debug('Reached the maximum number of retry attempts: %s', attempt_number)
444445
raise
445-
elif status_code < 500 and code != 'ProvisionedThroughputExceededException':
446+
elif status_code < 500 and code not in RATE_LIMITING_ERROR_CODES:
446447
# We don't retry on a ConditionalCheckFailedException or other 4xx (except for
447448
# throughput related errors) because we assume they will fail in perpetuity.
448449
# Retrying when there is already contention could cause other problems

pynamodb/models.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
DynamoDB Models for PynamoDB
33
"""
44
import json
5+
import random
56
import time
67
import six
78
import logging
@@ -11,7 +12,7 @@
1112
from six import add_metaclass
1213

1314
from pynamodb.expressions.condition import NotExists, Comparison
14-
from pynamodb.exceptions import DoesNotExist, TableDoesNotExist, TableError, InvalidStateError
15+
from pynamodb.exceptions import DoesNotExist, TableDoesNotExist, TableError, InvalidStateError, PutError
1516
from pynamodb.attributes import (
1617
Attribute, AttributeContainer, AttributeContainerMeta, MapAttribute, TTLAttribute, VersionAttribute
1718
)
@@ -52,6 +53,7 @@ def __init__(self, model, auto_commit=True):
5253
self.auto_commit = auto_commit
5354
self.max_operations = BATCH_WRITE_PAGE_LIMIT
5455
self.pending_operations = []
56+
self.failed_operations = []
5557

5658
def __enter__(self):
5759
return self
@@ -130,16 +132,24 @@ def commit(self):
130132
)
131133
if data is None:
132134
return
135+
retries = 0
133136
unprocessed_items = data.get(UNPROCESSED_ITEMS, {}).get(self.model.Meta.table_name)
134137
while unprocessed_items:
138+
sleep_time = random.randint(0, self.model.Meta.base_backoff_ms * (2 ** retries)) / 1000
139+
time.sleep(sleep_time)
140+
retries += 1
141+
if retries >= self.model.Meta.max_retry_attempts:
142+
self.failed_operations = unprocessed_items
143+
raise PutError("Failed to batch write items: max_retry_attempts exceeded")
135144
put_items = []
136145
delete_items = []
137146
for item in unprocessed_items:
138147
if PUT_REQUEST in item:
139148
put_items.append(item.get(PUT_REQUEST).get(ITEM))
140149
elif DELETE_REQUEST in item:
141150
delete_items.append(item.get(DELETE_REQUEST).get(KEY))
142-
log.info("Resending %s unprocessed keys for batch operation", len(unprocessed_items))
151+
log.info("Resending %d unprocessed keys for batch operation after %d seconds sleep",
152+
len(unprocessed_items), sleep_time)
143153
data = self.model._get_connection().batch_write_item(
144154
put_items=put_items,
145155
delete_items=delete_items

tests/test_model.py

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
from .deep_eq import deep_eq
1717
from pynamodb.connection.util import pythonic
18-
from pynamodb.exceptions import DoesNotExist, TableError
18+
from pynamodb.exceptions import DoesNotExist, TableError, PutError
1919
from pynamodb.types import RANGE
2020
from pynamodb.constants import (
2121
ITEM, STRING_SHORT, ALL, KEYS_ONLY, INCLUDE, REQUEST_ITEMS, UNPROCESSED_KEYS, CAMEL_COUNT,
@@ -236,6 +236,20 @@ class Meta:
236236
ttl = TTLAttribute(null=True)
237237

238238

239+
240+
class BatchModel(Model):
241+
"""
242+
A testing model
243+
"""
244+
245+
class Meta:
246+
table_name = 'BatchModel'
247+
max_retry_attempts = 0
248+
249+
user_name = UnicodeAttribute(hash_key=True)
250+
251+
252+
239253
class HostSpecificModel(Model):
240254
"""
241255
A testing model
@@ -1940,6 +1954,7 @@ def test_batch_write(self):
19401954
for item in items:
19411955
batch.save(item)
19421956

1957+
19431958
def test_batch_write_with_unprocessed(self):
19441959
picture_blob = b'FFD8FFD8'
19451960

@@ -1984,6 +1999,43 @@ def test_batch_write_with_unprocessed(self):
19841999

19852000
self.assertEqual(len(req.mock_calls), 3)
19862001

2002+
def test_batch_write_raises_put_error(self):
2003+
items = []
2004+
for idx in range(10):
2005+
items.append(BatchModel(
2006+
'{}'.format(idx)
2007+
))
2008+
2009+
unprocessed_items = []
2010+
for idx in range(5, 10):
2011+
unprocessed_items.append({
2012+
'PutRequest': {
2013+
'Item': {
2014+
'user_name': {STRING_SHORT: 'daniel'},
2015+
}
2016+
}
2017+
})
2018+
2019+
with patch(PATCH_METHOD) as req:
2020+
req.side_effect = [
2021+
{
2022+
UNPROCESSED_ITEMS: {
2023+
BatchModel.Meta.table_name: unprocessed_items[:2],
2024+
}
2025+
},
2026+
{
2027+
UNPROCESSED_ITEMS: {
2028+
BatchModel.Meta.table_name: unprocessed_items[2:],
2029+
}
2030+
},
2031+
{}
2032+
]
2033+
with self.assertRaises(PutError):
2034+
with BatchModel.batch_write() as batch:
2035+
for item in items:
2036+
batch.save(item)
2037+
self.assertEqual(len(batch.failed_operations), 3)
2038+
19872039
def test_index_queries(self):
19882040
"""
19892041
Model.Index.Query

0 commit comments

Comments
 (0)