Skip to content

ENH: Allow third-party packages to register IO engines #61642

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions doc/source/development/extending.rst
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,69 @@ registers the default "matplotlib" backend as follows.
More information on how to implement a third-party plotting backend can be found at
https://github.com/pandas-dev/pandas/blob/main/pandas/plotting/__init__.py#L1.

.. _extending.io-engines:

IO engines
-----------

pandas provides several IO connectors such as :func:`read_csv` or :meth:`to_parquet`, and many
of those support multiple engines. For example, :func:`read_csv` supports the ``python``, ``c``
and ``pyarrow`` engines, each with its advantages and disadvantages, making each more appropriate
for certain use cases.

Third-party package developers can implement engines for any of the pandas readers and writers.
When a ``pandas.read_*`` function or ``DataFrame.to_*`` method are called with an ``engine="<name>"``
that is not known to pandas, pandas will look into the entry points registered in the group
``pandas.io_engine`` by the packages in the environment, and will call the corresponding method.

An engine is a simple Python class which implements one or more of the pandas readers and writers
as class methods:

.. code-block:: python

class EmptyDataEngine:
@classmethod
def read_json(cls, path_or_buf=None, **kwargs):
return pd.DataFrame()

@classmethod
def to_json(cls, path_or_buf=None, **kwargs):
with open(path_or_buf, "w") as f:
f.write()

@classmethod
def read_clipboard(cls, sep='\\s+', dtype_backend=None, **kwargs):
return pd.DataFrame()

A single engine can support multiple readers and writers. When possible, it is a good practice for
a reader to provide both a reader and writer for the supported formats. But it is possible to
provide just one of them.

The package implementing the engine needs to create an entry point for pandas to be able to discover
it. This is done in ``pyproject.toml``:

```toml
[project.entry-points."pandas.io_engine"]
empty = empty_data:EmptyDataEngine
```

The first line should always be the same, creating the entry point in the ``pandas.io_engine`` group.
In the second line, ``empty`` is the name of the engine, and ``empty_data:EmptyDataEngine`` is where
to find the engine class in the package (``empty_data`` is the module name in this case).

If a user have the package of the example installed, them it would be possible to use:

.. code-block:: python

pd.read_json("myfile.json", engine="empty")

When pandas detects that no ``empty`` engine exists for the ``read_json`` reader in pandas, will
look at the entry points, will find the ``EmptyDataEngine`` engine, and will call the ``read_json``
method on it with the arguments provided by the user (except the ``engine`` parameter).

To avoid conflicts in the names of engines, we keep an "IO engines" section in our
[Ecosystem page](https://pandas.pydata.org/community/ecosystem.html#io-engines).

.. _extending.pandas_priority:

Arithmetic with 3rd party types
Expand Down
1 change: 1 addition & 0 deletions doc/source/whatsnew/v3.0.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ Other enhancements
- Support passing a :class:`Iterable[Hashable]` input to :meth:`DataFrame.drop_duplicates` (:issue:`59237`)
- Support reading Stata 102-format (Stata 1) dta files (:issue:`58978`)
- Support reading Stata 110-format (Stata 7) dta files (:issue:`47176`)
- Third-party packages can now register engines that can be used in pandas I/O operations :func:`read_iceberg` and :meth:`DataFrame.to_iceberg` (:issue:`61584`)

.. ---------------------------------------------------------------------------
.. _whatsnew_300.notable_bug_fixes:
Expand Down
11 changes: 10 additions & 1 deletion pandas/core/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,10 @@
nargsort,
)

from pandas.io.common import get_handle
from pandas.io.common import (
allow_third_party_engines,
get_handle,
)
from pandas.io.formats import (
console,
format as fmt,
Expand Down Expand Up @@ -3547,6 +3550,7 @@ def to_xml(

return xml_formatter.write_output()

@allow_third_party_engines
def to_iceberg(
self,
table_identifier: str,
Expand All @@ -3556,6 +3560,7 @@ def to_iceberg(
location: str | None = None,
append: bool = False,
snapshot_properties: dict[str, str] | None = None,
engine: str | None = None,
) -> None:
"""
Write a DataFrame to an Apache Iceberg table.
Expand All @@ -3580,6 +3585,10 @@ def to_iceberg(
If ``True``, append data to the table, instead of replacing the content.
snapshot_properties : dict of {str: str}, optional
Custom properties to be added to the snapshot summary
engine : str, optional
The engine to use. Engines can be installed via third-party packages. For an
updated list of existing pandas I/O engines check the I/O engines section of
our Ecosystem page.

See Also
--------
Expand Down
152 changes: 152 additions & 0 deletions pandas/io/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@
import codecs
from collections import defaultdict
from collections.abc import (
Callable,
Hashable,
Mapping,
Sequence,
)
import dataclasses
import functools
import gzip
from importlib.metadata import entry_points
from io import (
BufferedIOBase,
BytesIO,
Expand Down Expand Up @@ -90,6 +92,10 @@

from pandas import MultiIndex

# registry of I/O engines. It is populated the first time a non-core
# pandas engine is used
_io_engines: dict[str, Any] | None = None


@dataclasses.dataclass
class IOArgs:
Expand Down Expand Up @@ -1282,3 +1288,149 @@ def dedup_names(
counts[col] = cur_count + 1

return names


def _get_io_engine(name: str) -> Any:
"""
Return an I/O engine by its name.

pandas I/O engines can be registered via entry points. The first time this
function is called it will register all the entry points of the "pandas.io_engine"
group and cache them in the global `_io_engines` variable.

Engines are implemented as classes with the `read_<format>` and `to_<format>`
methods (classmethods) for the formats they wish to provide. This function will
return the method from the engine and format being requested.

Parameters
----------
name : str
The engine name provided by the user in `engine=<value>`.

Examples
--------
An engine is implemented with a class like:

>>> class DummyEngine:
... @classmethod
... def read_csv(cls, filepath_or_buffer, **kwargs):
... # the engine signature must match the pandas method signature
... return pd.DataFrame()

It must be registered as an entry point with the engine name:

```
[project.entry-points."pandas.io_engine"]
dummy = "pandas:io.dummy.DummyEngine"

```

Then the `read_csv` method of the engine can be used with:

>>> _get_io_engine(engine_name="dummy").read_csv("myfile.csv") # doctest: +SKIP

This is used internally to dispatch the next pandas call to the engine caller:

>>> df = read_csv("myfile.csv", engine="dummy") # doctest: +SKIP
"""
global _io_engines

if _io_engines is None:
_io_engines = {}
for entry_point in entry_points().select(group="pandas.io_engine"):
if entry_point.dist:
package_name = entry_point.dist.metadata["Name"]
else:
package_name = None
if entry_point.name in _io_engines:
_io_engines[entry_point.name]._packages.append(package_name)
else:
_io_engines[entry_point.name] = entry_point.load()
_io_engines[entry_point.name]._packages = [package_name]

try:
engine = _io_engines[name]
except KeyError as err:
raise ValueError(
f"'{name}' is not a known engine. Some engines are only available "
"after installing the package that provides them."
) from err

if len(engine._packages) > 1:
msg = (
f"The engine '{name}' has been registered by the package "
f"'{engine._packages[0]}' and will be used. "
)
if len(engine._packages) == 2:
msg += (
f"The package '{engine._packages[1]}' also tried to register "
"the engine, but it couldn't because it was already registered."
)
else:
msg += (
"The packages {str(engine._packages[1:]}[1:-1] also tried to register "
"the engine, but they couldn't because it was already registered."
)
warnings.warn(msg, RuntimeWarning, stacklevel=find_stack_level())

return engine


def allow_third_party_engines(
skip_engines: list[str] | Callable | None = None,
) -> Callable:
"""
Decorator to avoid boilerplate code when allowing readers and writers to use
third-party engines.

The decorator will introspect the function to know which format should be obtained,
and to know if it's a reader or a writer. Then it will check if the engine has been
registered, and if it has, it will dispatch the execution to the engine with the
arguments provided by the user.

Parameters
----------
skip_engines : list of str, optional
For engines that are implemented in pandas, we want to skip them for this engine
dispatching system. They should be specified in this parameter.

Examples
--------
The decorator works both with the `skip_engines` parameter, or without:

>>> class DataFrame:
... @allow_third_party_engines(["python", "c", "pyarrow"])
... def read_csv(filepath_or_buffer, **kwargs):
... pass
...
... @allow_third_party_engines
... def read_sas(filepath_or_buffer, **kwargs):
... pass
"""

def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
if callable(skip_engines) or skip_engines is None:
skip_engine = False
else:
skip_engine = kwargs["engine"] in skip_engines

if "engine" in kwargs and not skip_engine:
engine_name = kwargs.pop("engine")
engine = _get_io_engine(engine_name)
try:
return getattr(engine, func.__name__)(*args, **kwargs)
except AttributeError as err:
raise ValueError(
f"The engine '{engine_name}' does not provide a "
f"'{func.__name__}' function"
) from err
else:
return func(*args, **kwargs)

return wrapper

if callable(skip_engines):
return decorator(skip_engines)
return decorator
8 changes: 8 additions & 0 deletions pandas/io/iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@

from pandas import DataFrame

from pandas.io.common import allow_third_party_engines


@allow_third_party_engines
def read_iceberg(
table_identifier: str,
catalog_name: str | None = None,
Expand All @@ -18,6 +21,7 @@ def read_iceberg(
snapshot_id: int | None = None,
limit: int | None = None,
scan_properties: dict[str, Any] | None = None,
engine: str | None = None,
) -> DataFrame:
"""
Read an Apache Iceberg table into a pandas DataFrame.
Expand Down Expand Up @@ -52,6 +56,10 @@ def read_iceberg(
scan_properties : dict of {str: obj}, optional
Additional Table properties as a dictionary of string key value pairs to use
for this scan.
engine : str, optional
The engine to use. Engines can be installed via third-party packages. For an
updated list of existing pandas I/O engines check the I/O engines section of
our Ecosystem page.

Returns
-------
Expand Down
Loading
Loading