diff --git a/doc/source/development/extending.rst b/doc/source/development/extending.rst index e67829b8805eb..cab0428d650b6 100644 --- a/doc/source/development/extending.rst +++ b/doc/source/development/extending.rst @@ -489,6 +489,69 @@ registers the default "matplotlib" backend as follows. More information on how to implement a third-party plotting backend can be found at https://github.com/pandas-dev/pandas/blob/main/pandas/plotting/__init__.py#L1. +.. _extending.io-engines: + +IO engines +----------- + +pandas provides several IO connectors such as :func:`read_csv` or :meth:`to_parquet`, and many +of those support multiple engines. For example, :func:`read_csv` supports the ``python``, ``c`` +and ``pyarrow`` engines, each with its advantages and disadvantages, making each more appropriate +for certain use cases. + +Third-party package developers can implement engines for any of the pandas readers and writers. +When a ``pandas.read_*`` function or ``DataFrame.to_*`` method are called with an ``engine=""`` +that is not known to pandas, pandas will look into the entry points registered in the group +``pandas.io_engine`` by the packages in the environment, and will call the corresponding method. + +An engine is a simple Python class which implements one or more of the pandas readers and writers +as class methods: + +.. code-block:: python + + class EmptyDataEngine: + @classmethod + def read_json(cls, path_or_buf=None, **kwargs): + return pd.DataFrame() + + @classmethod + def to_json(cls, path_or_buf=None, **kwargs): + with open(path_or_buf, "w") as f: + f.write() + + @classmethod + def read_clipboard(cls, sep='\\s+', dtype_backend=None, **kwargs): + return pd.DataFrame() + +A single engine can support multiple readers and writers. When possible, it is a good practice for +a reader to provide both a reader and writer for the supported formats. But it is possible to +provide just one of them. + +The package implementing the engine needs to create an entry point for pandas to be able to discover +it. This is done in ``pyproject.toml``: + +```toml +[project.entry-points."pandas.io_engine"] +empty = empty_data:EmptyDataEngine +``` + +The first line should always be the same, creating the entry point in the ``pandas.io_engine`` group. +In the second line, ``empty`` is the name of the engine, and ``empty_data:EmptyDataEngine`` is where +to find the engine class in the package (``empty_data`` is the module name in this case). + +If a user have the package of the example installed, them it would be possible to use: + +.. code-block:: python + + pd.read_json("myfile.json", engine="empty") + +When pandas detects that no ``empty`` engine exists for the ``read_json`` reader in pandas, will +look at the entry points, will find the ``EmptyDataEngine`` engine, and will call the ``read_json`` +method on it with the arguments provided by the user (except the ``engine`` parameter). + +To avoid conflicts in the names of engines, we keep an "IO engines" section in our +[Ecosystem page](https://pandas.pydata.org/community/ecosystem.html#io-engines). + .. _extending.pandas_priority: Arithmetic with 3rd party types diff --git a/doc/source/whatsnew/v3.0.0.rst b/doc/source/whatsnew/v3.0.0.rst index 03a386708323d..1130480d3c7b5 100644 --- a/doc/source/whatsnew/v3.0.0.rst +++ b/doc/source/whatsnew/v3.0.0.rst @@ -88,6 +88,7 @@ Other enhancements - Support passing a :class:`Iterable[Hashable]` input to :meth:`DataFrame.drop_duplicates` (:issue:`59237`) - Support reading Stata 102-format (Stata 1) dta files (:issue:`58978`) - Support reading Stata 110-format (Stata 7) dta files (:issue:`47176`) +- Third-party packages can now register engines that can be used in pandas I/O operations :func:`read_iceberg` and :meth:`DataFrame.to_iceberg` (:issue:`61584`) .. --------------------------------------------------------------------------- .. _whatsnew_300.notable_bug_fixes: diff --git a/pandas/core/frame.py b/pandas/core/frame.py index 8053c17437c5e..e9a088aa99399 100644 --- a/pandas/core/frame.py +++ b/pandas/core/frame.py @@ -188,7 +188,10 @@ nargsort, ) -from pandas.io.common import get_handle +from pandas.io.common import ( + allow_third_party_engines, + get_handle, +) from pandas.io.formats import ( console, format as fmt, @@ -3547,6 +3550,7 @@ def to_xml( return xml_formatter.write_output() + @allow_third_party_engines def to_iceberg( self, table_identifier: str, @@ -3556,6 +3560,7 @@ def to_iceberg( location: str | None = None, append: bool = False, snapshot_properties: dict[str, str] | None = None, + engine: str | None = None, ) -> None: """ Write a DataFrame to an Apache Iceberg table. @@ -3580,6 +3585,10 @@ def to_iceberg( If ``True``, append data to the table, instead of replacing the content. snapshot_properties : dict of {str: str}, optional Custom properties to be added to the snapshot summary + engine : str, optional + The engine to use. Engines can be installed via third-party packages. For an + updated list of existing pandas I/O engines check the I/O engines section of + our Ecosystem page. See Also -------- diff --git a/pandas/io/common.py b/pandas/io/common.py index 1a9e6b472463d..3ec9e094fb118 100644 --- a/pandas/io/common.py +++ b/pandas/io/common.py @@ -9,6 +9,7 @@ import codecs from collections import defaultdict from collections.abc import ( + Callable, Hashable, Mapping, Sequence, @@ -16,6 +17,7 @@ import dataclasses import functools import gzip +from importlib.metadata import entry_points from io import ( BufferedIOBase, BytesIO, @@ -90,6 +92,10 @@ from pandas import MultiIndex +# registry of I/O engines. It is populated the first time a non-core +# pandas engine is used +_io_engines: dict[str, Any] | None = None + @dataclasses.dataclass class IOArgs: @@ -1282,3 +1288,149 @@ def dedup_names( counts[col] = cur_count + 1 return names + + +def _get_io_engine(name: str) -> Any: + """ + Return an I/O engine by its name. + + pandas I/O engines can be registered via entry points. The first time this + function is called it will register all the entry points of the "pandas.io_engine" + group and cache them in the global `_io_engines` variable. + + Engines are implemented as classes with the `read_` and `to_` + methods (classmethods) for the formats they wish to provide. This function will + return the method from the engine and format being requested. + + Parameters + ---------- + name : str + The engine name provided by the user in `engine=`. + + Examples + -------- + An engine is implemented with a class like: + + >>> class DummyEngine: + ... @classmethod + ... def read_csv(cls, filepath_or_buffer, **kwargs): + ... # the engine signature must match the pandas method signature + ... return pd.DataFrame() + + It must be registered as an entry point with the engine name: + + ``` + [project.entry-points."pandas.io_engine"] + dummy = "pandas:io.dummy.DummyEngine" + + ``` + + Then the `read_csv` method of the engine can be used with: + + >>> _get_io_engine(engine_name="dummy").read_csv("myfile.csv") # doctest: +SKIP + + This is used internally to dispatch the next pandas call to the engine caller: + + >>> df = read_csv("myfile.csv", engine="dummy") # doctest: +SKIP + """ + global _io_engines + + if _io_engines is None: + _io_engines = {} + for entry_point in entry_points().select(group="pandas.io_engine"): + if entry_point.dist: + package_name = entry_point.dist.metadata["Name"] + else: + package_name = None + if entry_point.name in _io_engines: + _io_engines[entry_point.name]._packages.append(package_name) + else: + _io_engines[entry_point.name] = entry_point.load() + _io_engines[entry_point.name]._packages = [package_name] + + try: + engine = _io_engines[name] + except KeyError as err: + raise ValueError( + f"'{name}' is not a known engine. Some engines are only available " + "after installing the package that provides them." + ) from err + + if len(engine._packages) > 1: + msg = ( + f"The engine '{name}' has been registered by the package " + f"'{engine._packages[0]}' and will be used. " + ) + if len(engine._packages) == 2: + msg += ( + f"The package '{engine._packages[1]}' also tried to register " + "the engine, but it couldn't because it was already registered." + ) + else: + msg += ( + "The packages {str(engine._packages[1:]}[1:-1] also tried to register " + "the engine, but they couldn't because it was already registered." + ) + warnings.warn(msg, RuntimeWarning, stacklevel=find_stack_level()) + + return engine + + +def allow_third_party_engines( + skip_engines: list[str] | Callable | None = None, +) -> Callable: + """ + Decorator to avoid boilerplate code when allowing readers and writers to use + third-party engines. + + The decorator will introspect the function to know which format should be obtained, + and to know if it's a reader or a writer. Then it will check if the engine has been + registered, and if it has, it will dispatch the execution to the engine with the + arguments provided by the user. + + Parameters + ---------- + skip_engines : list of str, optional + For engines that are implemented in pandas, we want to skip them for this engine + dispatching system. They should be specified in this parameter. + + Examples + -------- + The decorator works both with the `skip_engines` parameter, or without: + + >>> class DataFrame: + ... @allow_third_party_engines(["python", "c", "pyarrow"]) + ... def read_csv(filepath_or_buffer, **kwargs): + ... pass + ... + ... @allow_third_party_engines + ... def read_sas(filepath_or_buffer, **kwargs): + ... pass + """ + + def decorator(func: Callable) -> Callable: + @functools.wraps(func) + def wrapper(*args: Any, **kwargs: Any) -> Any: + if callable(skip_engines) or skip_engines is None: + skip_engine = False + else: + skip_engine = kwargs["engine"] in skip_engines + + if "engine" in kwargs and not skip_engine: + engine_name = kwargs.pop("engine") + engine = _get_io_engine(engine_name) + try: + return getattr(engine, func.__name__)(*args, **kwargs) + except AttributeError as err: + raise ValueError( + f"The engine '{engine_name}' does not provide a " + f"'{func.__name__}' function" + ) from err + else: + return func(*args, **kwargs) + + return wrapper + + if callable(skip_engines): + return decorator(skip_engines) + return decorator diff --git a/pandas/io/iceberg.py b/pandas/io/iceberg.py index dcb675271031e..c778a95809f97 100644 --- a/pandas/io/iceberg.py +++ b/pandas/io/iceberg.py @@ -6,7 +6,10 @@ from pandas import DataFrame +from pandas.io.common import allow_third_party_engines + +@allow_third_party_engines def read_iceberg( table_identifier: str, catalog_name: str | None = None, @@ -18,6 +21,7 @@ def read_iceberg( snapshot_id: int | None = None, limit: int | None = None, scan_properties: dict[str, Any] | None = None, + engine: str | None = None, ) -> DataFrame: """ Read an Apache Iceberg table into a pandas DataFrame. @@ -52,6 +56,10 @@ def read_iceberg( scan_properties : dict of {str: obj}, optional Additional Table properties as a dictionary of string key value pairs to use for this scan. + engine : str, optional + The engine to use. Engines can be installed via third-party packages. For an + updated list of existing pandas I/O engines check the I/O engines section of + our Ecosystem page. Returns ------- diff --git a/pandas/tests/io/test_io_engines.py b/pandas/tests/io/test_io_engines.py new file mode 100644 index 0000000000000..ccf4bff6533e8 --- /dev/null +++ b/pandas/tests/io/test_io_engines.py @@ -0,0 +1,105 @@ +from types import SimpleNamespace + +import pytest + +import pandas._testing as tm + +from pandas.io import common + + +class _MockIoEngine: + @classmethod + def read_foo(cls, fname): + return "third-party" + + +@pytest.fixture +def patch_engine(monkeypatch): + monkeypatch.setattr(common, "_get_io_engine", lambda name: _MockIoEngine) + + +@pytest.fixture +def patch_entry_points(monkeypatch): + class MockEntryPoint: + name = "myengine" + dist = SimpleNamespace(metadata={"Name": "mypackage"}) + + @staticmethod + def load(): + return _MockIoEngine + + class MockDuplicate1: + name = "duplicate" + dist = SimpleNamespace(metadata={"Name": "package1"}) + + @staticmethod + def load(): + return SimpleNamespace(read_foo=lambda fname: "dup1") + + class MockDuplicate2: + name = "duplicate" + dist = SimpleNamespace(metadata={"Name": "package2"}) + + @staticmethod + def load(): + return SimpleNamespace(read_foo=lambda fname: "dup1") + + monkeypatch.setattr(common, "_io_engines", None) + monkeypatch.setattr( + common, + "entry_points", + lambda: SimpleNamespace( + select=lambda group: [MockEntryPoint, MockDuplicate1, MockDuplicate2] + ), + ) + + +class TestIoEngines: + def test_decorator_with_no_engine(self, patch_engine): + @common.allow_third_party_engines + def read_foo(fname, engine=None): + return "default" + + result = read_foo("myfile.foo") + assert result == "default" + + def test_decorator_with_skipped_engine(self, patch_engine): + @common.allow_third_party_engines(skip_engines=["c"]) + def read_foo(fname, engine=None): + return "default" + + result = read_foo("myfile.foo", engine="c") + assert result == "default" + + def test_decorator_with_third_party_engine(self, patch_engine): + @common.allow_third_party_engines + def read_foo(fname, engine=None): + return "default" + + result = read_foo("myfile.foo", engine="third-party") + assert result == "third-party" + + def test_decorator_with_third_party_engine_but_no_method(self, patch_engine): + @common.allow_third_party_engines + def read_bar(fname, engine=None): + return "default" + + msg = "'third-party' does not provide a 'read_bar'" + with pytest.raises(ValueError, match=msg): + read_bar("myfile.foo", engine="third-party") + + def test_correct_io_engine(self, patch_entry_points): + result = common._get_io_engine("myengine") + assert result is _MockIoEngine + + def test_unknown_io_engine(self, patch_entry_points): + with pytest.raises(ValueError, match="'unknown' is not a known engine"): + common._get_io_engine("unknown") + + def test_duplicate_engine(self, patch_entry_points): + with tm.assert_produces_warning( + RuntimeWarning, + match="'duplicate' has been registered by the package 'package1'", + ): + result = common._get_io_engine("duplicate") + assert hasattr(result, "read_foo") diff --git a/web/pandas/community/ecosystem.md b/web/pandas/community/ecosystem.md index 1ebd4f3d3f1dc..341c668cc60df 100644 --- a/web/pandas/community/ecosystem.md +++ b/web/pandas/community/ecosystem.md @@ -712,6 +712,18 @@ authors to coordinate on the namespace. | [staircase](https://www.staircase.dev/) | `sc` | `Series`, `DataFrame` | | [woodwork](https://github.com/alteryx/woodwork) | `slice` | `Series`, `DataFrame` | +## IO engines + +Table with the third-party [IO engines](https://pandas.pydata.org/docs/development/extending.html#io-engines) +available to `read_*` functions and `DataFrame.to_*` methods. + + | Engine name | Library | Supported formats | + | ----------------|------------------------------------------------------ | ------------------------------- | + | | | | + +IO engines can be used by specifying the engine when calling a reader or writer +(e.g. `pd.read_csv("myfile.csv", engine="myengine")`). + ## Development tools ### [pandas-stubs](https://github.com/VirtusLab/pandas-stubs)