Skip to content

Commit 84c91f0

Browse files
Support ADLS with Pyarrow file IO (#2111)
<!-- Thanks for opening a pull request! --> <!-- In the case this PR will resolve an issue, please replace ${GITHUB_ISSUE_ID} below with the actual Github issue id. --> <!-- Closes #${GITHUB_ISSUE_ID} --> # Rationale for this change Starting from version 20, PyArrow supports ADLS filesystem. This PR adds Pyarrow Azure support to Pyiceberg. PyArrow is the [default IO](https://github.com/apache/iceberg-python/blob/main/pyiceberg/io/__init__.py#L366-L369) for Pyiceberg catalogs. In Azure environment it handles wider spectrum of auth strategies then Fsspec, including, for instance, [Managed Identities](https://learn.microsoft.com/en-us/entra/identity/managed-identities-azure-resources/overview). Also, prior to this PR #1663 (that is not merged yet) there was no support for wasb(s) with Fsspec. See the corresponding issue for more details: #2112 # Are these changes tested? Tests are added under tests/io/test_pyarrow.py. # Are there any user-facing changes? There are no API breaking changes. Direct impact of the PR: Pyarrow FileIO in Pyiceberg supports Azure cloud environment. Examples of impact for final users: - Pyiceberg is usable in services with Managed Identities auth strategy. - Pyiceberg is usable with wasb(s) schemes in Azure. <!-- In the case of user-facing changes, please add the changelog label. --> --------- Co-authored-by: Kevin Liu <kevinjqliu@users.noreply.github.com> Co-authored-by: Kevin Liu <kevin.jq.liu@gmail.com>
1 parent 2170fba commit 84c91f0

File tree

5 files changed

+321
-34
lines changed

5 files changed

+321
-34
lines changed

mkdocs/docs/configuration.md

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -146,16 +146,20 @@ For the FileIO there are several configuration options available:
146146

147147
<!-- markdown-link-check-disable -->
148148

149-
| Key | Example | Description |
150-
| ---------------------- | ----------------------------------------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
151-
| adls.connection-string | AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqF...;BlobEndpoint=<http://localhost/> | A [connection string](https://learn.microsoft.com/en-us/azure/storage/common/storage-configure-connection-string). This could be used to use FileIO with any adls-compatible object storage service that has a different endpoint (like [azurite](https://github.com/azure/azurite)). |
152-
| adls.account-name | devstoreaccount1 | The account that you want to connect to |
153-
| adls.account-key | Eby8vdM02xNOcqF... | The key to authentication against the account. |
154-
| adls.sas-token | NuHOuuzdQN7VRM%2FOpOeqBlawRCA845IY05h9eu1Yte4%3D | The shared access signature |
155-
| adls.tenant-id | ad667be4-b811-11ed-afa1-0242ac120002 | The tenant-id |
156-
| adls.client-id | ad667be4-b811-11ed-afa1-0242ac120002 | The client-id |
157-
| adls.client-secret | oCA3R6P\*ka#oa1Sms2J74z... | The client-secret |
158-
| adls.account-host | accountname1.blob.core.windows.net | The storage account host. See [AzureBlobFileSystem](https://github.com/fsspec/adlfs/blob/adb9c53b74a0d420625b86dd00fbe615b43201d2/adlfs/spec.py#L125) for reference |
149+
| Key | Example | Description |
150+
|------------------------------|---------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
151+
| adls.connection-string | AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqF...;BlobEndpoint=<http://localhost/> | A [connection string](https://learn.microsoft.com/en-us/azure/storage/common/storage-configure-connection-string). This could be used to use FileIO with any adls-compatible object storage service that has a different endpoint (like [azurite](https://github.com/azure/azurite)). |
152+
| adls.account-name | devstoreaccount1 | The account that you want to connect to |
153+
| adls.account-key | Eby8vdM02xNOcqF... | The key to authentication against the account. |
154+
| adls.sas-token | NuHOuuzdQN7VRM%2FOpOeqBlawRCA845IY05h9eu1Yte4%3D | The shared access signature |
155+
| adls.tenant-id | ad667be4-b811-11ed-afa1-0242ac120002 | The tenant-id |
156+
| adls.client-id | ad667be4-b811-11ed-afa1-0242ac120002 | The client-id |
157+
| adls.client-secret | oCA3R6P\*ka#oa1Sms2J74z... | The client-secret |
158+
| adls.account-host | accountname1.blob.core.windows.net | The storage account host. See [AzureBlobFileSystem](https://github.com/fsspec/adlfs/blob/adb9c53b74a0d420625b86dd00fbe615b43201d2/adlfs/spec.py#L125) for reference |
159+
| adls.blob-storage-authority | .blob.core.windows.net | The hostname[:port] of the Blob Service. Defaults to `.blob.core.windows.net`. Useful for connecting to a local emulator, like [azurite](https://github.com/azure/azurite). See [AzureFileSystem](https://arrow.apache.org/docs/python/filesystems.html#azure-storage-file-system) for reference |
160+
| adls.dfs-storage-authority | .dfs.core.windows.net | The hostname[:port] of the Data Lake Gen 2 Service. Defaults to `.dfs.core.windows.net`. Useful for connecting to a local emulator, like [azurite](https://github.com/azure/azurite). See [AzureFileSystem](https://arrow.apache.org/docs/python/filesystems.html#azure-storage-file-system) for reference |
161+
| adls.blob-storage-scheme | https | Either `http` or `https`. Defaults to `https`. Useful for connecting to a local emulator, like [azurite](https://github.com/azure/azurite). See [AzureFileSystem](https://arrow.apache.org/docs/python/filesystems.html#azure-storage-file-system) for reference |
162+
| adls.dfs-storage-scheme | https | Either `http` or `https`. Defaults to `https`. Useful for connecting to a local emulator, like [azurite](https://github.com/azure/azurite). See [AzureFileSystem](https://arrow.apache.org/docs/python/filesystems.html#azure-storage-file-system) for reference |
159163

160164
<!-- markdown-link-check-enable-->
161165

pyiceberg/io/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,10 @@
8080
ADLS_CLIENT_ID = "adls.client-id"
8181
ADLS_CLIENT_SECRET = "adls.client-secret"
8282
ADLS_ACCOUNT_HOST = "adls.account-host"
83+
ADLS_BLOB_STORAGE_AUTHORITY = "adls.blob-storage-authority"
84+
ADLS_DFS_STORAGE_AUTHORITY = "adls.dfs-storage-authority"
85+
ADLS_BLOB_STORAGE_SCHEME = "adls.blob-storage-scheme"
86+
ADLS_DFS_STORAGE_SCHEME = "adls.dfs-storage-scheme"
8387
GCS_TOKEN = "gcs.oauth2.token"
8488
GCS_TOKEN_EXPIRES_AT_MS = "gcs.oauth2.token-expires-at"
8589
GCS_PROJECT_ID = "gcs.project-id"

pyiceberg/io/pyarrow.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,13 @@
8484
)
8585
from pyiceberg.expressions.visitors import visit as boolean_expression_visit
8686
from pyiceberg.io import (
87+
ADLS_ACCOUNT_KEY,
88+
ADLS_ACCOUNT_NAME,
89+
ADLS_BLOB_STORAGE_AUTHORITY,
90+
ADLS_BLOB_STORAGE_SCHEME,
91+
ADLS_DFS_STORAGE_AUTHORITY,
92+
ADLS_DFS_STORAGE_SCHEME,
93+
ADLS_SAS_TOKEN,
8794
AWS_ACCESS_KEY_ID,
8895
AWS_REGION,
8996
AWS_ROLE_ARN,
@@ -390,6 +397,9 @@ def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSyste
390397
elif scheme in {"gs", "gcs"}:
391398
return self._initialize_gcs_fs()
392399

400+
elif scheme in {"abfs", "abfss", "wasb", "wasbs"}:
401+
return self._initialize_azure_fs()
402+
393403
elif scheme in {"file"}:
394404
return self._initialize_local_fs()
395405

@@ -471,6 +481,43 @@ def _initialize_s3_fs(self, netloc: Optional[str]) -> FileSystem:
471481

472482
return S3FileSystem(**client_kwargs)
473483

484+
def _initialize_azure_fs(self) -> FileSystem:
485+
from packaging import version
486+
487+
MIN_PYARROW_VERSION_SUPPORTING_AZURE_FS = "20.0.0"
488+
if version.parse(pyarrow.__version__) < version.parse(MIN_PYARROW_VERSION_SUPPORTING_AZURE_FS):
489+
raise ImportError(
490+
f"pyarrow version >= {MIN_PYARROW_VERSION_SUPPORTING_AZURE_FS} required for AzureFileSystem support, "
491+
f"but found version {pyarrow.__version__}."
492+
)
493+
494+
from pyarrow.fs import AzureFileSystem
495+
496+
client_kwargs: Dict[str, str] = {}
497+
498+
if account_name := self.properties.get(ADLS_ACCOUNT_NAME):
499+
client_kwargs["account_name"] = account_name
500+
501+
if account_key := self.properties.get(ADLS_ACCOUNT_KEY):
502+
client_kwargs["account_key"] = account_key
503+
504+
if blob_storage_authority := self.properties.get(ADLS_BLOB_STORAGE_AUTHORITY):
505+
client_kwargs["blob_storage_authority"] = blob_storage_authority
506+
507+
if dfs_storage_authority := self.properties.get(ADLS_DFS_STORAGE_AUTHORITY):
508+
client_kwargs["dfs_storage_authority"] = dfs_storage_authority
509+
510+
if blob_storage_scheme := self.properties.get(ADLS_BLOB_STORAGE_SCHEME):
511+
client_kwargs["blob_storage_scheme"] = blob_storage_scheme
512+
513+
if dfs_storage_scheme := self.properties.get(ADLS_DFS_STORAGE_SCHEME):
514+
client_kwargs["dfs_storage_scheme"] = dfs_storage_scheme
515+
516+
if sas_token := self.properties.get(ADLS_SAS_TOKEN):
517+
client_kwargs["sas_token"] = sas_token
518+
519+
return AzureFileSystem(**client_kwargs)
520+
474521
def _initialize_hdfs_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem:
475522
from pyarrow.fs import HadoopFileSystem
476523

tests/conftest.py

Lines changed: 59 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,12 @@
5252
from pyiceberg.catalog.noop import NoopCatalog
5353
from pyiceberg.expressions import BoundReference
5454
from pyiceberg.io import (
55+
ADLS_ACCOUNT_KEY,
56+
ADLS_ACCOUNT_NAME,
57+
ADLS_BLOB_STORAGE_AUTHORITY,
58+
ADLS_BLOB_STORAGE_SCHEME,
59+
ADLS_DFS_STORAGE_AUTHORITY,
60+
ADLS_DFS_STORAGE_SCHEME,
5561
GCS_PROJECT_ID,
5662
GCS_SERVICE_HOST,
5763
GCS_TOKEN,
@@ -348,6 +354,11 @@ def table_schema_with_all_types() -> Schema:
348354
)
349355

350356

357+
@pytest.fixture(params=["abfs", "abfss", "wasb", "wasbs"])
358+
def adls_scheme(request: pytest.FixtureRequest) -> str:
359+
return request.param
360+
361+
351362
@pytest.fixture(scope="session")
352363
def pyarrow_schema_simple_without_ids() -> "pa.Schema":
353364
import pyarrow as pa
@@ -2088,6 +2099,26 @@ def fsspec_fileio_gcs(request: pytest.FixtureRequest) -> FsspecFileIO:
20882099
return fsspec.FsspecFileIO(properties=properties)
20892100

20902101

2102+
@pytest.fixture
2103+
def adls_fsspec_fileio(request: pytest.FixtureRequest) -> Generator[FsspecFileIO, None, None]:
2104+
from azure.storage.blob import BlobServiceClient
2105+
2106+
azurite_url = request.config.getoption("--adls.endpoint")
2107+
azurite_account_name = request.config.getoption("--adls.account-name")
2108+
azurite_account_key = request.config.getoption("--adls.account-key")
2109+
azurite_connection_string = f"DefaultEndpointsProtocol=http;AccountName={azurite_account_name};AccountKey={azurite_account_key};BlobEndpoint={azurite_url}/{azurite_account_name};"
2110+
properties = {
2111+
"adls.connection-string": azurite_connection_string,
2112+
"adls.account-name": azurite_account_name,
2113+
}
2114+
2115+
bbs = BlobServiceClient.from_connection_string(conn_str=azurite_connection_string)
2116+
bbs.create_container("tests")
2117+
yield fsspec.FsspecFileIO(properties=properties)
2118+
bbs.delete_container("tests")
2119+
bbs.close()
2120+
2121+
20912122
@pytest.fixture
20922123
def pyarrow_fileio_gcs(request: pytest.FixtureRequest) -> "PyArrowFileIO":
20932124
from pyiceberg.io.pyarrow import PyArrowFileIO
@@ -2101,6 +2132,34 @@ def pyarrow_fileio_gcs(request: pytest.FixtureRequest) -> "PyArrowFileIO":
21012132
return PyArrowFileIO(properties=properties)
21022133

21032134

2135+
@pytest.fixture
2136+
def pyarrow_fileio_adls(request: pytest.FixtureRequest) -> Generator[Any, None, None]:
2137+
from azure.storage.blob import BlobServiceClient
2138+
2139+
from pyiceberg.io.pyarrow import PyArrowFileIO
2140+
2141+
azurite_url = request.config.getoption("--adls.endpoint")
2142+
azurite_scheme, azurite_authority = azurite_url.split("://", 1)
2143+
2144+
azurite_account_name = request.config.getoption("--adls.account-name")
2145+
azurite_account_key = request.config.getoption("--adls.account-key")
2146+
azurite_connection_string = f"DefaultEndpointsProtocol=http;AccountName={azurite_account_name};AccountKey={azurite_account_key};BlobEndpoint={azurite_url}/{azurite_account_name};"
2147+
properties = {
2148+
ADLS_ACCOUNT_NAME: azurite_account_name,
2149+
ADLS_ACCOUNT_KEY: azurite_account_key,
2150+
ADLS_BLOB_STORAGE_AUTHORITY: azurite_authority,
2151+
ADLS_DFS_STORAGE_AUTHORITY: azurite_authority,
2152+
ADLS_BLOB_STORAGE_SCHEME: azurite_scheme,
2153+
ADLS_DFS_STORAGE_SCHEME: azurite_scheme,
2154+
}
2155+
2156+
bbs = BlobServiceClient.from_connection_string(conn_str=azurite_connection_string)
2157+
bbs.create_container("warehouse")
2158+
yield PyArrowFileIO(properties=properties)
2159+
bbs.delete_container("warehouse")
2160+
bbs.close()
2161+
2162+
21042163
def aws_credentials() -> None:
21052164
os.environ["AWS_ACCESS_KEY_ID"] = "testing"
21062165
os.environ["AWS_SECRET_ACCESS_KEY"] = "testing"
@@ -2162,26 +2221,6 @@ def fixture_dynamodb(_aws_credentials: None) -> Generator[boto3.client, None, No
21622221
yield boto3.client("dynamodb", region_name="us-east-1")
21632222

21642223

2165-
@pytest.fixture
2166-
def adls_fsspec_fileio(request: pytest.FixtureRequest) -> Generator[FsspecFileIO, None, None]:
2167-
from azure.storage.blob import BlobServiceClient
2168-
2169-
azurite_url = request.config.getoption("--adls.endpoint")
2170-
azurite_account_name = request.config.getoption("--adls.account-name")
2171-
azurite_account_key = request.config.getoption("--adls.account-key")
2172-
azurite_connection_string = f"DefaultEndpointsProtocol=http;AccountName={azurite_account_name};AccountKey={azurite_account_key};BlobEndpoint={azurite_url}/{azurite_account_name};"
2173-
properties = {
2174-
"adls.connection-string": azurite_connection_string,
2175-
"adls.account-name": azurite_account_name,
2176-
}
2177-
2178-
bbs = BlobServiceClient.from_connection_string(conn_str=azurite_connection_string)
2179-
bbs.create_container("tests")
2180-
yield fsspec.FsspecFileIO(properties=properties)
2181-
bbs.delete_container("tests")
2182-
bbs.close()
2183-
2184-
21852224
@pytest.fixture(scope="session")
21862225
def empty_home_dir_path(tmp_path_factory: pytest.TempPathFactory) -> str:
21872226
home_path = str(tmp_path_factory.mktemp("home"))

0 commit comments

Comments
 (0)