Skip to content

Commit 910a893

Browse files
authored
Expose S3 retry strategy (#2110)
<!-- Thanks for opening a pull request! --> <!-- In the case this PR will resolve an issue, please replace ${GITHUB_ISSUE_ID} below with the actual Github issue id. --> <!-- Closes #${GITHUB_ISSUE_ID} --> # Rationale for this change # Are these changes tested? # Are there any user-facing changes? <!-- In the case of user-facing changes, please add the changelog label. -->
1 parent c27028f commit 910a893

File tree

4 files changed

+61
-18
lines changed

4 files changed

+61
-18
lines changed

mkdocs/docs/configuration.md

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -109,23 +109,24 @@ For the FileIO there are several configuration options available:
109109

110110
<!-- markdown-link-check-disable -->
111111

112-
| Key | Example | Description |
113-
|-----------------------------|----------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
114-
| s3.endpoint | <https://10.0.19.25/> | Configure an alternative endpoint of the S3 service for the FileIO to access. This could be used to use S3FileIO with any s3-compatible object storage service that has a different endpoint, or access a private S3 endpoint in a virtual private cloud. |
115-
| s3.access-key-id | admin | Configure the static access key id used to access the FileIO. |
116-
| s3.secret-access-key | password | Configure the static secret access key used to access the FileIO. |
117-
| s3.session-token | AQoDYXdzEJr... | Configure the static session token used to access the FileIO. |
118-
| s3.role-session-name | session | An optional identifier for the assumed role session. |
119-
| s3.role-arn | arn:aws:... | AWS Role ARN. If provided instead of access_key and secret_key, temporary credentials will be fetched by assuming this role. |
120-
| s3.signer | bearer | Configure the signature version of the FileIO. |
121-
| s3.signer.uri | <http://my.signer:8080/s3> | Configure the remote signing uri if it differs from the catalog uri. Remote signing is only implemented for `FsspecFileIO`. The final request is sent to `<s3.signer.uri>/<s3.signer.endpoint>`. |
122-
| s3.signer.endpoint | v1/main/s3-sign | Configure the remote signing endpoint. Remote signing is only implemented for `FsspecFileIO`. The final request is sent to `<s3.signer.uri>/<s3.signer.endpoint>`. (default : v1/aws/s3/sign). |
123-
| s3.region | us-west-2 | Configure the default region used to initialize an `S3FileSystem`. `PyArrowFileIO` attempts to automatically tries to resolve the region if this isn't set (only supported for AWS S3 Buckets). |
124-
| s3.resolve-region | False | Only supported for `PyArrowFileIO`, when enabled, it will always try to resolve the location of the bucket (only supported for AWS S3 Buckets). |
125-
| s3.proxy-uri | <http://my.proxy.com:8080> | Configure the proxy server to be used by the FileIO. |
126-
| s3.connect-timeout | 60.0 | Configure socket connection timeout, in seconds. |
127-
| s3.request-timeout | 60.0 | Configure socket read timeouts on Windows and macOS, in seconds. |
112+
| Key | Example | Description |
113+
|-----------------------------|----------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
114+
| s3.endpoint | <https://10.0.19.25/> | Configure an alternative endpoint of the S3 service for the FileIO to access. This could be used to use S3FileIO with any s3-compatible object storage service that has a different endpoint, or access a private S3 endpoint in a virtual private cloud. |
115+
| s3.access-key-id | admin | Configure the static access key id used to access the FileIO. |
116+
| s3.secret-access-key | password | Configure the static secret access key used to access the FileIO. |
117+
| s3.session-token | AQoDYXdzEJr... | Configure the static session token used to access the FileIO. |
118+
| s3.role-session-name | session | An optional identifier for the assumed role session. |
119+
| s3.role-arn | arn:aws:... | AWS Role ARN. If provided instead of access_key and secret_key, temporary credentials will be fetched by assuming this role. |
120+
| s3.signer | bearer | Configure the signature version of the FileIO. |
121+
| s3.signer.uri | <http://my.signer:8080/s3> | Configure the remote signing uri if it differs from the catalog uri. Remote signing is only implemented for `FsspecFileIO`. The final request is sent to `<s3.signer.uri>/<s3.signer.endpoint>`. |
122+
| s3.signer.endpoint | v1/main/s3-sign | Configure the remote signing endpoint. Remote signing is only implemented for `FsspecFileIO`. The final request is sent to `<s3.signer.uri>/<s3.signer.endpoint>`. (default : v1/aws/s3/sign). |
123+
| s3.region | us-west-2 | Configure the default region used to initialize an `S3FileSystem`. `PyArrowFileIO` attempts to automatically tries to resolve the region if this isn't set (only supported for AWS S3 Buckets). |
124+
| s3.resolve-region | False | Only supported for `PyArrowFileIO`, when enabled, it will always try to resolve the location of the bucket (only supported for AWS S3 Buckets). |
125+
| s3.proxy-uri | <http://my.proxy.com:8080> | Configure the proxy server to be used by the FileIO. |
126+
| s3.connect-timeout | 60.0 | Configure socket connection timeout, in seconds. |
127+
| s3.request-timeout | 60.0 | Configure socket read timeouts on Windows and macOS, in seconds. |
128128
| s3.force-virtual-addressing | False | Whether to use virtual addressing of buckets. If true, then virtual addressing is always enabled. If false, then virtual addressing is only enabled if endpoint_override is empty. This can be used for non-AWS backends that only support virtual hosted-style access. |
129+
| s3.retry-strategy-impl | None | Ability to set a custom S3 retry strategy. A full path to a class needs to be given that extends the [S3RetryStrategy](https://github.com/apache/arrow/blob/639201bfa412db26ce45e73851432018af6c945e/python/pyarrow/_s3fs.pyx#L110) base class. |
129130

130131
<!-- markdown-link-check-enable-->
131132

pyiceberg/io/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
S3_ROLE_ARN = "s3.role-arn"
6969
S3_ROLE_SESSION_NAME = "s3.role-session-name"
7070
S3_FORCE_VIRTUAL_ADDRESSING = "s3.force-virtual-addressing"
71+
S3_RETRY_STRATEGY_IMPL = "s3.retry-strategy-impl"
7172
HDFS_HOST = "hdfs.host"
7273
HDFS_PORT = "hdfs.port"
7374
HDFS_USER = "hdfs.user"

pyiceberg/io/pyarrow.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
import fnmatch
2929
import functools
30+
import importlib
3031
import itertools
3132
import logging
3233
import operator
@@ -63,6 +64,7 @@
6364
import pyarrow.lib
6465
import pyarrow.parquet as pq
6566
from pyarrow import ChunkedArray
67+
from pyarrow._s3fs import S3RetryStrategy
6668
from pyarrow.fs import (
6769
FileInfo,
6870
FileSystem,
@@ -111,6 +113,7 @@
111113
S3_REGION,
112114
S3_REQUEST_TIMEOUT,
113115
S3_RESOLVE_REGION,
116+
S3_RETRY_STRATEGY_IMPL,
114117
S3_ROLE_ARN,
115118
S3_ROLE_SESSION_NAME,
116119
S3_SECRET_ACCESS_KEY,
@@ -214,6 +217,20 @@ def _cached_resolve_s3_region(bucket: str) -> Optional[str]:
214217
return None
215218

216219

220+
def _import_retry_strategy(impl: str) -> Optional[S3RetryStrategy]:
221+
try:
222+
path_parts = impl.split(".")
223+
if len(path_parts) < 2:
224+
raise ValueError(f"retry-strategy-impl should be full path (module.CustomS3RetryStrategy), got: {impl}")
225+
module_name, class_name = ".".join(path_parts[:-1]), path_parts[-1]
226+
module = importlib.import_module(module_name)
227+
class_ = getattr(module, class_name)
228+
return class_()
229+
except (ModuleNotFoundError, AttributeError):
230+
warnings.warn(f"Could not initialize S3 retry strategy: {impl}")
231+
return None
232+
233+
217234
class UnsupportedPyArrowTypeException(Exception):
218235
"""Cannot convert PyArrow type to corresponding Iceberg type."""
219236

@@ -476,6 +493,11 @@ def _initialize_s3_fs(self, netloc: Optional[str]) -> FileSystem:
476493
if self.properties.get(S3_FORCE_VIRTUAL_ADDRESSING) is not None:
477494
client_kwargs["force_virtual_addressing"] = property_as_bool(self.properties, S3_FORCE_VIRTUAL_ADDRESSING, False)
478495

496+
if (retry_strategy_impl := self.properties.get(S3_RETRY_STRATEGY_IMPL)) and (
497+
retry_instance := _import_retry_strategy(retry_strategy_impl)
498+
):
499+
client_kwargs["retry_strategy"] = retry_instance
500+
479501
return S3FileSystem(**client_kwargs)
480502

481503
def _initialize_azure_fs(self) -> FileSystem:

tests/io/test_pyarrow.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import os
2020
import tempfile
2121
import uuid
22+
import warnings
2223
from datetime import date
2324
from typing import Any, List, Optional
2425
from unittest.mock import MagicMock, patch
@@ -29,7 +30,7 @@
2930
import pyarrow.parquet as pq
3031
import pytest
3132
from packaging import version
32-
from pyarrow.fs import FileType, LocalFileSystem, S3FileSystem
33+
from pyarrow.fs import AwsDefaultS3RetryStrategy, FileType, LocalFileSystem, S3FileSystem
3334

3435
from pyiceberg.exceptions import ResolveError
3536
from pyiceberg.expressions import (
@@ -57,7 +58,7 @@
5758
Or,
5859
)
5960
from pyiceberg.expressions.literals import literal
60-
from pyiceberg.io import InputStream, OutputStream, load_file_io
61+
from pyiceberg.io import S3_RETRY_STRATEGY_IMPL, InputStream, OutputStream, load_file_io
6162
from pyiceberg.io.pyarrow import (
6263
ICEBERG_SCHEMA,
6364
ArrowScan,
@@ -2512,3 +2513,21 @@ def test_pyarrow_io_multi_fs() -> None:
25122513

25132514
# Same PyArrowFileIO instance resolves local file input to LocalFileSystem
25142515
assert isinstance(pyarrow_file_io.new_input("file:///path/to/file")._filesystem, LocalFileSystem)
2516+
2517+
2518+
class SomeRetryStrategy(AwsDefaultS3RetryStrategy):
2519+
def __init__(self) -> None:
2520+
super().__init__()
2521+
warnings.warn("Initialized SomeRetryStrategy 👍")
2522+
2523+
2524+
def test_retry_strategy() -> None:
2525+
io = PyArrowFileIO(properties={S3_RETRY_STRATEGY_IMPL: "tests.io.test_pyarrow.SomeRetryStrategy"})
2526+
with pytest.warns(UserWarning, match="Initialized SomeRetryStrategy.*"):
2527+
io.new_input("s3://bucket/path/to/file")
2528+
2529+
2530+
def test_retry_strategy_not_found() -> None:
2531+
io = PyArrowFileIO(properties={S3_RETRY_STRATEGY_IMPL: "pyiceberg.DoesNotExist"})
2532+
with pytest.warns(UserWarning, match="Could not initialize S3 retry strategy: pyiceberg.DoesNotExist"):
2533+
io.new_input("s3://bucket/path/to/file")

0 commit comments

Comments
 (0)