Skip to content

[S3] More comprehensive error handling #2451

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion metaflow/plugins/datatools/s3/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ def __init__(
run: Optional[Union[FlowSpec, "metaflow.Run"]] = None,
s3root: Optional[str] = None,
encryption: Optional[str] = S3_SERVER_SIDE_ENCRYPTION,
**kwargs
**kwargs,
):
if run:
# 1. use a (current) run ID with optional customizations
Expand Down Expand Up @@ -853,6 +853,8 @@ def _head():
raise MetaflowS3NotFound()
elif info["error"] == s3op.ERROR_URL_ACCESS_DENIED:
raise MetaflowS3AccessDenied()
elif info["error"] == s3op.ERROR_INVALID_REQUEST:
raise MetaflowS3Exception("Invalid request for %s" % s3url)
else:
raise MetaflowS3Exception("Got error: %d" % info["error"])
else:
Expand Down Expand Up @@ -1379,6 +1381,8 @@ def _one_boto_op(self, op, url, create_tmp_file=True):
raise MetaflowS3AccessDenied(url)
elif error_code == 416:
raise MetaflowS3InvalidRange(err)
elif error_code == 400:
raise MetaflowS3Exception(f"Invalid request for {url}: {str(err)}")
elif error_code == "NoSuchBucket":
raise MetaflowS3URLException("Specified S3 bucket doesn't exist.")
error = str(err)
Expand Down
245 changes: 214 additions & 31 deletions metaflow/plugins/datatools/s3/s3op.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,194 @@

S3Config = namedtuple("S3Config", "role session_vars client_params")

# Error code mappings for AWS S3 and general AWS services
# - S3 Error Responses: https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
# - Boto3 Retries: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html

# Permission or access-related errors → 403 Forbidden
PERMISSION_ERRORS = {
"AccessDenied",
"AccessDeniedException",
"AccountProblem",
"AllAccessDisabled",
"AuthFailure",
"ExpiredToken",
"InvalidAccessKeyId",
"InvalidPayer",
"InvalidSecurity",
"InvalidToken",
"NotSignedUp",
"RequestTimeTooSkewed",
"SignatureDoesNotMatch",
"UnauthorizedOperation",
"UnrecognizedClientException",
}

# Not found errors → 404 Not Found
NOT_FOUND_ERRORS = {
"AccessPointNotFound",
"NoSuchAccessGrantsLocationError",
"NoSuchAccessGrantError",
"NoSuchBucket",
"NoSuchBucketPolicy",
"NoSuchKey",
"NoSuchLifecycleConfiguration",
"NoSuchMultiRegionAccessPoint",
"NoSuchUpload",
"NoSuchVersion",
"NoSuchWebsiteConfiguration",
"ReplicationConfigurationNotFoundError",
"ServerSideEncryptionConfigurationNotFoundError",
"NotFound",
}

# Range/invalid byte-range errors → 416
RANGE_ERRORS = {
"InvalidRange",
}

# Server-side throttling, timeout, or transient errors → 503
TRANSIENT_ERRORS = {
"BandwidthLimitExceeded",
"ConnectionError",
"EC2ThrottledException",
"InternalError",
"InternalFailure",
"PriorRequestNotComplete",
"ProvisionedThroughputExceededException",
"RequestLimitExceeded",
"RequestThrottled",
"RequestThrottledException",
"RequestTimeout",
"RequestTimeoutException",
"ServerError",
"ServiceUnavailable",
"SlowDown",
"TemporaryRedirect",
"ThrottledException",
"Throttling",
"ThrottlingException",
"TooManyRequestsException",
"TransactionInProgressException",
"Unavailable",
"ReplicationInternalError",
"ReplicationTooManyRequests",
}

# Fatal/unrecoverable → 400
FATAL_ERRORS = {
"AccessControlListNotSupported",
"AccessGrantAlreadyExists",
"AccessGrantsInstanceAlreadyExists",
"AccessGrantsInstanceNotEmptyError",
"AccessGrantsInstanceNotExistsError",
"AccessGrantsInstanceResourcePolicyNotExists",
"AccessGrantsLocationAlreadyExistsError",
"AccessGrantsLocationNotEmptyError",
"AccessGrantsLocationsQuotaExceededError",
"AccessGrantsQuotaExceededError",
"AmbiguousGrantByEmailAddress",
"BadDigest",
"BadRequest",
"BucketAlreadyExists",
"BucketAlreadyOwnedByYou",
"CrossLocationLoggingProhibited",
"DryRunOperation",
"EntityTooLarge",
"EntityTooSmall",
"ExpiredObjectDeleteMarker",
"HTTPClientError",
"IllegalLocationConstraintException",
"IllegalVersioningConfigurationException",
"IncompleteBody",
"IncorrectNumberOfFilesInPostRequest",
"InlineDataTooLarge",
"InsufficientData",
"InvalidAccessGrant",
"InvalidAccessGrantsLocation",
"InvalidAddressingHeader",
"InvalidArgument",
"InvalidBucketName",
"InvalidBucketState",
"InvalidClientTokenId",
"InvalidDigest",
"InvalidEncryptionAlgorithmError",
"InvalidIamRole",
"InvalidIdentityCenterInstance",
"InvalidLocationConstraint",
"InvalidObjectState",
"InvalidParameterCombination",
"InvalidParameterValue",
"InvalidPart",
"InvalidPartOrder",
"InvalidPolicyDocument",
"InvalidQueryParameter",
"InvalidRange",
"InvalidRequest",
"InvalidResourcePolicy",
"InvalidStorageClass",
"InvalidTag",
"InvalidTags",
"InvalidTargetBucketForLogging",
"InvalidToken",
"InvalidURI",
"KeyTooLongError",
"LambdaInvalidResponse",
"LambdaInvocationFailed",
"LambdaNotFound",
"LambdaPermissionError",
"LambdaResponseNotReceived",
"LambdaRuntimeError",
"LambdaTimeout",
"LimitExceededException",
"MalformedACLError",
"MalformedPolicyDocument",
"MalformedPOSTRequest",
"MalformedQueryString",
"MalformedXML",
"MaxMessageLengthExceeded",
"MaxPostPreDataLengthExceededError",
"MetadataTooLarge",
"MethodNotAllowed",
"MissingAttachment",
"MissingContentLength",
"MissingParameter",
"MissingRequestBodyError",
"MissingSecurityHeader",
"MultiRegionAccessPointAlreadyOwnedByYou",
"MultiRegionAccessPointModifiedByAnotherRequest",
"MultiRegionAccessPointNotReady",
"MultiRegionAccessPointSameBucketRegion",
"MultiRegionAccessPointUnsupportedRegion",
"NoLoggingStatusForKey",
"NotImplemented",
"NotSignedUp",
"OperationAborted",
"OptInRequired",
"PermanentRedirect",
"PreconditionFailed",
"Redirect",
"ReplicationParameterValueError",
"RequestIsNotMultiPartContent",
"RequestTorrentOfBucketError",
"RestoreAlreadyInProgress",
"StsNotAuthorizedError",
"StsPackedPolicyTooLargeError",
"StsValidationError",
"TokenRefreshRequired",
"TooManyBuckets",
"TooManyConfigurations",
"TooManyElements",
"TooManyTags",
"UnexpectedContent",
"UnresolvableGrantByEmailAddress",
"UnsupportedOperation",
"UnsupportedProtocol",
"UserKeyMustBeSpecified",
"ValidationError",
"ValidationException",
}


class S3Url(object):
def __init__(
Expand Down Expand Up @@ -107,6 +295,7 @@ def __str__(self):
ERROR_INVALID_RANGE = 11
ERROR_TRANSIENT = 12
ERROR_OUT_OF_DISK_SPACE = 13
ERROR_INVALID_REQUEST = 14


def format_result_line(idx, prefix, url="", local=""):
Expand All @@ -129,37 +318,20 @@ def normalize_client_error(err):
try:
return int(error_code)
except ValueError:
if error_code in ("AccessDenied", "AllAccessDisabled", "InvalidAccessKeyId"):
return 403
if error_code in ("NoSuchKey", "NoSuchBucket"):
return 404
if error_code == "InvalidRange":
return 416
# We "normalize" retriable server errors to 503. These are also considered
# transient by boto3 (see:
# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html)
if error_code in (
"SlowDown",
"RequestTimeout",
"RequestTimeoutException",
"PriorRequestNotComplete",
"ConnectionError",
"HTTPClientError",
"Throttling",
"ThrottlingException",
"ThrottledException",
"RequestThrottledException",
"TooManyRequestsException",
"ProvisionedThroughputExceededException",
"TransactionInProgressException",
"RequestLimitExceeded",
"BandwidthLimitExceeded",
"LimitExceededException",
"RequestThrottled",
"EC2ThrottledException",
"InternalError",
):
return 503
pass

if error_code in PERMISSION_ERRORS:
return 403
elif error_code in NOT_FOUND_ERRORS:
return 404
elif error_code in RANGE_ERRORS:
return 416
elif error_code in FATAL_ERRORS:
return 400
elif error_code in TRANSIENT_ERRORS:
return 503

# Default: return original string code if unmapped
return error_code


Expand Down Expand Up @@ -199,6 +371,8 @@ def op_info(url):
to_return = {"error": ERROR_URL_ACCESS_DENIED, "raise_error": err}
elif error_code == 416:
to_return = {"error": ERROR_INVALID_RANGE, "raise_error": err}
elif error_code == 400:
to_return = {"error": ERROR_INVALID_REQUEST, "raise_error": err}
elif error_code in (500, 502, 503, 504):
to_return = {"error": ERROR_TRANSIENT, "raise_error": err}
else:
Expand Down Expand Up @@ -392,6 +566,9 @@ def handle_client_error(err, idx, result_file):
elif error_code == 403:
result_file.write("%d %d\n" % (idx, -ERROR_URL_ACCESS_DENIED))
result_file.flush()
elif error_code == 400:
result_file.write("%d %d\n" % (idx, -ERROR_INVALID_REQUEST))
result_file.flush()
elif error_code == 503:
result_file.write("%d %d\n" % (idx, -ERROR_TRANSIENT))
result_file.flush()
Expand Down Expand Up @@ -564,6 +741,8 @@ def get_info(self, url):
return False, url, ERROR_URL_NOT_FOUND
elif error_code == 403:
return False, url, ERROR_URL_ACCESS_DENIED
elif error_code == 400:
return False, url, ERROR_INVALID_REQUEST
# Transient errors are going to be retried by the aws_retry decorator
else:
raise
Expand Down Expand Up @@ -612,6 +791,8 @@ def list_prefix(self, prefix_url, delimiter=""):
return False, prefix_url, ERROR_URL_NOT_FOUND
elif error_code == 403:
return False, prefix_url, ERROR_URL_ACCESS_DENIED
elif error_code == 400:
return False, prefix_url, ERROR_INVALID_REQUEST
# Transient errors are going to be retried by the aws_retry decorator
else:
raise
Expand Down Expand Up @@ -655,6 +836,8 @@ def exit(exit_code, url):
msg = "Transient error for url: %s" % url
elif exit_code == ERROR_OUT_OF_DISK_SPACE:
msg = "Out of disk space when downloading URL: %s" % url
elif exit_code == ERROR_INVALID_REQUEST:
msg = "Invalid request for URL: %s" % url
else:
msg = "Unknown error"
print("s3op failed:\n%s" % msg, file=sys.stderr)
Expand Down
Loading