Skip to content

Commit 2a5fb97

Browse files
Fokkoamitgilad3
authored andcommitted
Prefer FileIO over the PyArrow FileSystem (apache#2115)
# Rationale for this change This is problematic if you try to implement your own `FileIO`. Then Streams are opened both through the FileIO and the FileSystem directly. # Are these changes tested? Yes, existing tests. # Are there any user-facing changes? No, but I think this makes the code esthetically also more pleasing by removing complexity. <!-- In the case of user-facing changes, please add the changelog label. --> # Numbers A while ago I did some inspection of the calls being made to S3, so just to be sure that we don't alter anything, I've collected some stats using a small "benchmark" locally: ```python def test_fokko(session_catalog: RestCatalog): parquet_file = "/Users/fokko.driesprong/Downloads/yellow_tripdata_2024-01.parquet" from pyarrow import parquet as pq df = pq.read_table(parquet_file) try: session_catalog.drop_table("default.taxi") except Exception: pass tbl = session_catalog.create_table("default.taxi", schema=df.schema) with tbl.update_spec() as tx: tx.add_field("tpep_pickup_datetime", "hour") tbl.append(df) rounds = [] for _ in range(22): start = round(time.time() * 1000) assert len(tbl.scan().to_arrow()) == 2964624 stop = round(time.time() * 1000) rounds.append(stop - start) print(f"Took: {sum(rounds) / len(rounds)} ms on average") ``` Main: Took: 1715.1818181818182 ms on average ``` > mc admin trace --stats minio Call Count RPM Avg Time Min Time Max Time Avg TTFB Max TTFB Avg Size Rate /min Errors s3.GetObject 77 (29.2%) 697.9 701µs 153µs 1.6ms 463µs 838µs ↑159B ↓712K ↑108K ↓485M 0 s3.HeadObject 73 (27.7%) 661.6 192µs 107µs 735µs 177µs 719µs ↑153B ↑99K 0 s3.CompleteMultipartUpload 37 (14.0%) 335.4 8.2ms 1.9ms 17.5ms 8.2ms 17.5ms ↑397B ↓507B ↑130K ↓166K 0 s3.NewMultipartUpload 37 (14.0%) 335.4 6.2ms 2.1ms 14.2ms 6.1ms 14.1ms ↑153B ↓437B ↑50K ↓143K 0 s3.PutObjectPart 37 (14.0%) 335.4 18.4ms 5.1ms 38.8ms 18.4ms 38.8ms ↑1.4M ↑469M 0 s3.PutObject 3 (1.1%) 27.2 5.4ms 3.4ms 8.8ms 5.3ms 8.8ms ↑2.8K ↑75K 0 ``` Branch: Took: 1723.1818181818182 ms on average ``` > mc admin trace --stats minio Call Count RPM Avg Time Min Time Max Time Avg TTFB Max TTFB Avg Size Rate /min Errors s3.GetObject 77 (29.2%) 696.3 927µs 171µs 4.5ms 610µs 3.5ms ↑159B ↓712K ↑108K ↓484M 0 s3.HeadObject 73 (27.7%) 660.1 222µs 109µs 1.2ms 205µs 1.2ms ↑153B ↑99K 0 s3.CompleteMultipartUpload 37 (14.0%) 334.6 4.4ms 1.2ms 14.2ms 4.4ms 14.2ms ↑397B ↓507B ↑130K ↓166K 0 s3.NewMultipartUpload 37 (14.0%) 334.6 4.3ms 1.2ms 15ms 4.3ms 15ms ↑153B ↓437B ↑50K ↓143K 0 s3.PutObjectPart 37 (14.0%) 334.6 14.5ms 2.6ms 30.7ms 14.5ms 30.7ms ↑1.4M ↑468M 0 s3.PutObject 3 (1.1%) 27.1 6.6ms 2.8ms 10.4ms 6.5ms 10.3ms ↑2.8K ↑75K 0 ```
1 parent d4621fd commit 2a5fb97

File tree

3 files changed

+13
-57
lines changed

3 files changed

+13
-57
lines changed

pyiceberg/io/__init__.py

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727

2828
import importlib
2929
import logging
30-
import os
3130
import warnings
3231
from abc import ABC, abstractmethod
3332
from io import SEEK_SET
@@ -37,7 +36,6 @@
3736
List,
3837
Optional,
3938
Protocol,
40-
Tuple,
4139
Type,
4240
Union,
4341
runtime_checkable,
@@ -371,14 +369,3 @@ def load_file_io(properties: Properties = EMPTY_DICT, location: Optional[str] =
371369
raise ModuleNotFoundError(
372370
'Could not load a FileIO, please consider installing one: pip3 install "pyiceberg[pyarrow]", for more options refer to the docs.'
373371
) from e
374-
375-
376-
def _parse_location(location: str) -> Tuple[str, str, str]:
377-
"""Return the path without the scheme."""
378-
uri = urlparse(location)
379-
if not uri.scheme:
380-
return "file", uri.netloc, os.path.abspath(location)
381-
elif uri.scheme in ("hdfs", "viewfs"):
382-
return uri.scheme, uri.netloc, uri.path
383-
else:
384-
return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}"

pyiceberg/io/pyarrow.py

Lines changed: 12 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@
6969
FileInfo,
7070
FileSystem,
7171
FileType,
72-
FSSpecHandler,
7372
)
7473
from sortedcontainers import SortedList
7574

@@ -117,7 +116,6 @@
117116
InputStream,
118117
OutputFile,
119118
OutputStream,
120-
_parse_location,
121119
)
122120
from pyiceberg.manifest import (
123121
DataFile,
@@ -309,9 +307,7 @@ def open(self, seekable: bool = True) -> InputStream:
309307
input_file = self._filesystem.open_input_file(self._path)
310308
else:
311309
input_file = self._filesystem.open_input_stream(self._path, buffer_size=self._buffer_size)
312-
except FileNotFoundError:
313-
raise
314-
except PermissionError:
310+
except (FileNotFoundError, PermissionError):
315311
raise
316312
except OSError as e:
317313
if e.errno == 2 or "Path does not exist" in str(e):
@@ -916,27 +912,20 @@ def _get_file_format(file_format: FileFormat, **kwargs: Dict[str, Any]) -> ds.Fi
916912
raise ValueError(f"Unsupported file format: {file_format}")
917913

918914

919-
def _construct_fragment(fs: FileSystem, data_file: DataFile, file_format_kwargs: Dict[str, Any] = EMPTY_DICT) -> ds.Fragment:
920-
_, _, path = PyArrowFileIO.parse_location(data_file.file_path)
921-
return _get_file_format(data_file.file_format, **file_format_kwargs).make_fragment(path, fs)
922-
923-
924-
def _read_deletes(fs: FileSystem, data_file: DataFile) -> Dict[str, pa.ChunkedArray]:
915+
def _read_deletes(io: FileIO, data_file: DataFile) -> Dict[str, pa.ChunkedArray]:
925916
if data_file.file_format == FileFormat.PARQUET:
926-
delete_fragment = _construct_fragment(
927-
fs,
928-
data_file,
929-
file_format_kwargs={"dictionary_columns": ("file_path",), "pre_buffer": True, "buffer_size": ONE_MEGABYTE},
930-
)
931-
table = ds.Scanner.from_fragment(fragment=delete_fragment).to_table()
917+
with io.new_input(data_file.file_path).open() as fi:
918+
delete_fragment = _get_file_format(
919+
data_file.file_format, dictionary_columns=("file_path",), pre_buffer=True, buffer_size=ONE_MEGABYTE
920+
).make_fragment(fi)
921+
table = ds.Scanner.from_fragment(fragment=delete_fragment).to_table()
932922
table = table.unify_dictionaries()
933923
return {
934924
file.as_py(): table.filter(pc.field("file_path") == file).column("pos")
935925
for file in table.column("file_path").chunks[0].dictionary
936926
}
937927
elif data_file.file_format == FileFormat.PUFFIN:
938-
_, _, path = PyArrowFileIO.parse_location(data_file.file_path)
939-
with fs.open_input_file(path) as fi:
928+
with io.new_input(data_file.file_path).open() as fi:
940929
payload = fi.read()
941930

942931
return PuffinFile(payload).to_vector()
@@ -1383,7 +1372,7 @@ def _get_column_projection_values(
13831372

13841373

13851374
def _task_to_record_batches(
1386-
fs: FileSystem,
1375+
io: FileIO,
13871376
task: FileScanTask,
13881377
bound_row_filter: BooleanExpression,
13891378
projected_schema: Schema,
@@ -1393,9 +1382,8 @@ def _task_to_record_batches(
13931382
name_mapping: Optional[NameMapping] = None,
13941383
partition_spec: Optional[PartitionSpec] = None,
13951384
) -> Iterator[pa.RecordBatch]:
1396-
_, _, path = _parse_location(task.file.file_path)
13971385
arrow_format = ds.ParquetFileFormat(pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8))
1398-
with fs.open_input_file(path) as fin:
1386+
with io.new_input(task.file.file_path).open() as fin:
13991387
fragment = arrow_format.make_fragment(fin)
14001388
physical_schema = fragment.physical_schema
14011389
# In V1 and V2 table formats, we only support Timestamp 'us' in Iceberg Schema
@@ -1479,7 +1467,7 @@ def _read_all_delete_files(io: FileIO, tasks: Iterable[FileScanTask]) -> Dict[st
14791467
executor = ExecutorFactory.get_or_create()
14801468
deletes_per_files: Iterator[Dict[str, ChunkedArray]] = executor.map(
14811469
lambda args: _read_deletes(*args),
1482-
[(_fs_from_file_path(io, delete_file.file_path), delete_file) for delete_file in unique_deletes],
1470+
[(io, delete_file) for delete_file in unique_deletes],
14831471
)
14841472
for delete in deletes_per_files:
14851473
for file, arr in delete.items():
@@ -1491,25 +1479,6 @@ def _read_all_delete_files(io: FileIO, tasks: Iterable[FileScanTask]) -> Dict[st
14911479
return deletes_per_file
14921480

14931481

1494-
def _fs_from_file_path(io: FileIO, file_path: str) -> FileSystem:
1495-
scheme, netloc, _ = _parse_location(file_path)
1496-
if isinstance(io, PyArrowFileIO):
1497-
return io.fs_by_scheme(scheme, netloc)
1498-
else:
1499-
try:
1500-
from pyiceberg.io.fsspec import FsspecFileIO
1501-
1502-
if isinstance(io, FsspecFileIO):
1503-
from pyarrow.fs import PyFileSystem
1504-
1505-
return PyFileSystem(FSSpecHandler(io.get_fs(scheme)))
1506-
else:
1507-
raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: {io}")
1508-
except ModuleNotFoundError as e:
1509-
# When FsSpec is not installed
1510-
raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: {io}") from e
1511-
1512-
15131482
class ArrowScan:
15141483
_table_metadata: TableMetadata
15151484
_io: FileIO
@@ -1654,7 +1623,7 @@ def _record_batches_from_scan_tasks_and_deletes(
16541623
if self._limit is not None and total_row_count >= self._limit:
16551624
break
16561625
batches = _task_to_record_batches(
1657-
_fs_from_file_path(self._io, task.file.file_path),
1626+
self._io,
16581627
task,
16591628
self._bound_row_filter,
16601629
self._projected_schema,

tests/io/test_pyarrow.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1539,7 +1539,7 @@ def deletes_file(tmp_path: str, example_task: FileScanTask) -> str:
15391539

15401540

15411541
def test_read_deletes(deletes_file: str, example_task: FileScanTask) -> None:
1542-
deletes = _read_deletes(LocalFileSystem(), DataFile.from_args(file_path=deletes_file, file_format=FileFormat.PARQUET))
1542+
deletes = _read_deletes(PyArrowFileIO(), DataFile.from_args(file_path=deletes_file, file_format=FileFormat.PARQUET))
15431543
assert set(deletes.keys()) == {example_task.file.file_path}
15441544
assert list(deletes.values())[0] == pa.chunked_array([[1, 3, 5]])
15451545

0 commit comments

Comments
 (0)