From 757447e60cb478ff09613d9a5aa440a434932422 Mon Sep 17 00:00:00 2001 From: Tejas Ganesh Naik <“tejasgn@amazon.com”> Date: Tue, 24 Jun 2025 13:50:55 -0700 Subject: [PATCH] capturing ddb-throttling err in Python --- .../dynamodb/batching/dynamo_batching.py | 67 +++++++++++++------ 1 file changed, 46 insertions(+), 21 deletions(-) diff --git a/python/example_code/dynamodb/batching/dynamo_batching.py b/python/example_code/dynamodb/batching/dynamo_batching.py index f05bb4759c1..3ee0911a8cd 100644 --- a/python/example_code/dynamodb/batching/dynamo_batching.py +++ b/python/example_code/dynamodb/batching/dynamo_batching.py @@ -82,26 +82,45 @@ def do_batch_get(batch_keys): sleepy_time = 1 # Start with 1 second of sleep, then exponentially increase. retrieved = {key: [] for key in batch_keys} while tries < max_tries: - response = dynamodb.batch_get_item(RequestItems=batch_keys) - # Collect any retrieved items and retry unprocessed keys. - for key in response.get("Responses", []): - retrieved[key] += response["Responses"][key] - unprocessed = response["UnprocessedKeys"] - if len(unprocessed) > 0: - batch_keys = unprocessed - unprocessed_count = sum( - [len(batch_key["Keys"]) for batch_key in batch_keys.values()] - ) - logger.info( - "%s unprocessed keys returned. Sleep, then retry.", unprocessed_count - ) - tries += 1 - if tries < max_tries: - logger.info("Sleeping for %s seconds.", sleepy_time) - time.sleep(sleepy_time) - sleepy_time = min(sleepy_time * 2, 32) - else: - break + try: + response = dynamodb.batch_get_item(RequestItems=batch_keys) + # Collect any retrieved items and retry unprocessed keys. + for key in response.get("Responses", []): + retrieved[key] += response["Responses"][key] + unprocessed = response["UnprocessedKeys"] + if len(unprocessed) > 0: + batch_keys = unprocessed + unprocessed_count = sum( + [len(batch_key["Keys"]) for batch_key in batch_keys.values()] + ) + logger.info( + "%s unprocessed keys returned. Sleep, then retry.", unprocessed_count + ) + tries += 1 + if tries < max_tries: + logger.info("Sleeping for %s seconds.", sleepy_time) + time.sleep(sleepy_time) + sleepy_time = min(sleepy_time * 2, 32) + else: + break + except ClientError as error: + if error.response["Error"]["Code"] in ["ProvisionedThroughputExceeded", "ThrottlingException","GsiProvisionedThroughputExceeded", "RequestLimitExceeded"] ughputExceeded"]: + # Check for new throttlingReasons field + if "throttlingReason" in error.response: + for reason in error.response["throttlingReason"]: + logger.warning( + "Throttling detected - Reason: %s, Resource: %s",reason.get("reason"),reason.get("resource") + ) + else: + # Fallback to previous message + logger.warning("Throttling detected: %s", error.response["Error"]["Message"]) + tries += 1 + if tries < max_tries: + logger.info("Sleeping for %s seconds.", sleepy_time) + time.sleep(sleepy_time) + sleepy_time = min(sleepy_time * 2, 32) + else: + raise return retrieved @@ -129,7 +148,9 @@ def fill_table(table, table_data): for item in table_data: writer.put_item(Item=item) logger.info("Loaded data into table %s.", table.name) - except ClientError: + except ClientError as error: + if "throttlingReason" in error.response: + logger.error("Batch write throttled with reasons: %s", error.response["throttlingReason"]) logger.exception("Couldn't load data into table %s.", table.name) raise @@ -215,6 +236,10 @@ def archive_movies(movie_table, movie_data): "Got expected exception when trying to put duplicate records into the " "archive table." ) + elif error.response["Error"]["Code"] in ["ProvisionedThroughputExceededException", "ThrottlingException"]: + if "throttlingReasons" in error.response: + logger.error("Archive operation throttled: %s", error.response["throttlingReasons"]) + raise else: logger.exception( "Got unexpected exception when trying to put duplicate records into "