Skip to content

Commit b9d0249

Browse files
npownpowsavingoyal
authored
[RFR] S3 bugfixes (#2329)
This PR fixes several issues which caused s3op to be stuck: 1. Need to call `queue.cancel_join_thread()` so that the workers can exit without flushing the queue, otherwise there is a deadlock 2. Catch the correct exceptions in download/upload (they don't actually raise `ClientError`) 3. Handle `InternalError` 4. Handle `SSLError` 5. Optimistically assume all other unhandled exceptions are transient Additional improvements: 1. Added exponential backoff to `jitter_sleep()` 2. Set default retry config in s3op.py to match that in aws_client.py 3. Fix a bug where the retry setting was not being applied properly if config was missing 4. Fail early on fatal errors 5. Don't restart from scratch when there's no progress --------- Co-authored-by: npow <npow@netflix.com> Co-authored-by: Savin <savingoyal@gmail.com>
1 parent 2e97e51 commit b9d0249

File tree

6 files changed

+203
-111
lines changed

6 files changed

+203
-111
lines changed

.github/workflows/metaflow.s3_tests.yml

+3-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ permissions: read-all
1414

1515
jobs:
1616
test_data:
17-
if: (github.event.action == 'labeled' && (github.event.label.name == 'approved' || github.event.label.name == 'ok-to-test')) || (github.event.action != 'labeled' && (contains(github.event.pull_request.labels.*.name, 'ok-to-test') || contains(github.event.pull_request.labels.*.name, 'approved')))
17+
# NOTE: temporarily disable s3 tests since they will fail due to 403
18+
# https://netflix.slack.com/archives/C023Y9JH36W/p1740806169172489?thread_ts=1740180557.110859&cid=C023Y9JH36W
19+
if: false && ((github.event.action == 'labeled' && (github.event.label.name == 'approved' || github.event.label.name == 'ok-to-test')) || (github.event.action != 'labeled' && (contains(github.event.pull_request.labels.*.name, 'ok-to-test') || contains(github.event.pull_request.labels.*.name, 'approved'))))
1820
name: metaflow.s3 / Python ${{ matrix.ver }} on ${{ matrix.os }}
1921
runs-on: ${{ matrix.os }}
2022
strategy:

metaflow/metaflow_config.py

+6
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,12 @@
109109
# top-level retries)
110110
S3_TRANSIENT_RETRY_COUNT = from_conf("S3_TRANSIENT_RETRY_COUNT", 20)
111111

112+
# S3 retry configuration used in the aws client
113+
# Use the adaptive retry strategy by default
114+
S3_CLIENT_RETRY_CONFIG = from_conf(
115+
"S3_CLIENT_RETRY_CONFIG", {"max_attempts": 10, "mode": "adaptive"}
116+
)
117+
112118
# Threshold to start printing warnings for an AWS retry
113119
RETRY_WARNING_THRESHOLD = 3
114120

metaflow/plugins/aws/aws_client.py

+4-3
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ def get_client(
1414
AWS_SANDBOX_ENABLED,
1515
AWS_SANDBOX_STS_ENDPOINT_URL,
1616
AWS_SANDBOX_API_KEY,
17+
S3_CLIENT_RETRY_CONFIG,
1718
)
1819

1920
if session_vars is None:
@@ -37,10 +38,10 @@ def get_client(
3738
if module == "s3" and (
3839
"config" not in client_params or client_params["config"].retries is None
3940
):
40-
# Use the adaptive retry strategy by default -- do not set anything if
41-
# the user has already set something
41+
# do not set anything if the user has already set something
4242
config = client_params.get("config", Config())
43-
config.retries = {"max_attempts": 10, "mode": "adaptive"}
43+
config.retries = S3_CLIENT_RETRY_CONFIG
44+
client_params["config"] = config
4445

4546
if AWS_SANDBOX_ENABLED:
4647
# role is ignored in the sandbox

metaflow/plugins/datatools/s3/s3.py

+46-44
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
S3_RETRY_COUNT,
1919
S3_TRANSIENT_RETRY_COUNT,
2020
S3_SERVER_SIDE_ENCRYPTION,
21+
S3_WORKER_COUNT,
2122
TEMPDIR,
2223
)
2324
from metaflow.util import (
@@ -1390,9 +1391,31 @@ def _one_boto_op(self, op, url, create_tmp_file=True):
13901391
)
13911392

13921393
# add some jitter to make sure retries are not synchronized
1393-
def _jitter_sleep(self, trynum, multiplier=2):
1394-
interval = multiplier**trynum + random.randint(0, 10)
1395-
time.sleep(interval)
1394+
def _jitter_sleep(
1395+
self, trynum: int, base: int = 2, cap: int = 360, jitter: float = 0.1
1396+
) -> None:
1397+
"""
1398+
Sleep for an exponentially increasing interval with added jitter.
1399+
1400+
Parameters
1401+
----------
1402+
trynum: The current retry attempt number.
1403+
base: The base multiplier for the exponential backoff.
1404+
cap: The maximum interval to sleep.
1405+
jitter: The maximum jitter percentage to add to the interval.
1406+
"""
1407+
# Calculate the exponential backoff interval
1408+
interval = min(cap, base**trynum)
1409+
1410+
# Add random jitter
1411+
jitter_value = interval * jitter * random.uniform(-1, 1)
1412+
interval_with_jitter = interval + jitter_value
1413+
1414+
# Ensure the interval is not negative
1415+
interval_with_jitter = max(0, interval_with_jitter)
1416+
1417+
# Sleep for the calculated interval
1418+
time.sleep(interval_with_jitter)
13961419

13971420
# NOTE: re: _read_many_files and _put_many_files
13981421
# All file IO is through binary files - we write bytes, we read
@@ -1480,20 +1503,17 @@ def _s3op_with_retries(self, mode, **options):
14801503
# - a known transient failure (SlowDown for example) in which case we will
14811504
# retry *only* the inputs that have this transient failure.
14821505
# - an unknown failure (something went wrong but we cannot say if it was
1483-
# a known permanent failure or something else). In this case, we retry
1484-
# the operation completely.
1485-
#
1486-
# There are therefore two retry counts:
1487-
# - the transient failure retry count: how many times do we try on known
1488-
# transient errors
1489-
# - the top-level retry count: how many times do we try on unknown failures
1506+
# a known permanent failure or something else). In this case, we assume
1507+
# it's a transient failure and retry only those inputs (same as above).
14901508
#
1491-
# Note that, if the operation runs out of transient failure retries, it will
1492-
# count as an "unknown" failure (ie: it will be retried according to the
1493-
# outer top-level retry count). In other words, you can potentially have
1494-
# transient_retry_count * retry_count tries).
1495-
# Finally, if on transient failures, we make NO progress (ie: no input is
1496-
# successfully processed), that counts as an "unknown" failure.
1509+
# NOTES(npow): 2025-05-13
1510+
# Previously, this code would also retry the fatal failures, including no_progress
1511+
# and unknown failures, from the beginning. This is not ideal because:
1512+
# 1. Fatal errors are not supposed to be retried.
1513+
# 2. Retrying from the beginning does not improve the situation, and is
1514+
# wasteful since we have already uploaded some files.
1515+
# 3. The number of transient errors is far more than fatal errors, so we
1516+
# can be optimistic and assume the unknown errors are transient.
14971517
cmdline = [sys.executable, os.path.abspath(s3op.__file__), mode]
14981518
recursive_get = False
14991519
for key, value in options.items():
@@ -1528,7 +1548,6 @@ def _inject_failure_rate():
15281548
# Otherwise, we cap the failure rate at 90%
15291549
return min(90, self._s3_inject_failures)
15301550

1531-
retry_count = 0 # Number of retries (excluding transient failures)
15321551
transient_retry_count = 0 # Number of transient retries (per top-level retry)
15331552
inject_failures = _inject_failure_rate()
15341553
out_lines = [] # List to contain the lines returned by _s3op_with_retries
@@ -1595,7 +1614,12 @@ def try_s3_op(last_ok_count, pending_retries, out_lines, inject_failures):
15951614
# things, this will shrink more and more until we are doing a
15961615
# single operation at a time. If things start going better, it
15971616
# will increase by 20% every round.
1598-
max_count = min(int(last_ok_count * 1.2), len(pending_retries))
1617+
#
1618+
# If we made no progress (last_ok_count == 0) we retry at most
1619+
# 2*S3_WORKER_COUNT from whatever is left in `pending_retries`
1620+
max_count = min(
1621+
int(last_ok_count * 1.2), len(pending_retries)
1622+
) or min(2 * S3_WORKER_COUNT, len(pending_retries))
15991623
tmp_input.writelines(pending_retries[:max_count])
16001624
tmp_input.flush()
16011625
debug.s3client_exec(
@@ -1712,38 +1736,16 @@ def try_s3_op(last_ok_count, pending_retries, out_lines, inject_failures):
17121736
_update_out_lines(out_lines, ok_lines, resize=loop_count == 0)
17131737
return 0, 0, inject_failures, err_out
17141738

1715-
while retry_count <= S3_RETRY_COUNT:
1739+
while transient_retry_count <= S3_TRANSIENT_RETRY_COUNT:
17161740
(
17171741
last_ok_count,
17181742
last_retry_count,
17191743
inject_failures,
17201744
err_out,
17211745
) = try_s3_op(last_ok_count, pending_retries, out_lines, inject_failures)
1722-
if err_out or (
1723-
last_retry_count != 0
1724-
and (
1725-
last_ok_count == 0
1726-
or transient_retry_count > S3_TRANSIENT_RETRY_COUNT
1727-
)
1728-
):
1729-
# We had a fatal failure (err_out is not None)
1730-
# or we made no progress (last_ok_count is 0)
1731-
# or we are out of transient retries
1732-
# so we will restart from scratch (being very conservative)
1733-
retry_count += 1
1734-
err_msg = err_out
1735-
if err_msg is None and last_ok_count == 0:
1736-
err_msg = "No progress"
1737-
if err_msg is None:
1738-
err_msg = "Too many transient errors"
1739-
print(
1740-
"S3 non-transient error (attempt #%d): %s" % (retry_count, err_msg)
1741-
)
1742-
_reset()
1743-
if retry_count <= S3_RETRY_COUNT:
1744-
self._jitter_sleep(retry_count)
1745-
continue
1746-
elif last_retry_count != 0:
1746+
if err_out:
1747+
break
1748+
if last_retry_count != 0:
17471749
# During our last try, we did not manage to process everything we wanted
17481750
# due to a transient failure so we try again.
17491751
transient_retry_count += 1

0 commit comments

Comments
 (0)