Skip to content

Commit ff1ecbe

Browse files
jaychiarichardliaw
andauthored
[data] Integrate Ray Dataset with Daft Dataframe (#51531)
<!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? This adds nice integrations between Daft and Ray Data along with: Unit test for data round-trip (including more interesting types such as a Tensor) Documentation Implementation on Ray Dataset for conversion to/from Daft Dataframes ## Related issue number Re-opening #50630 which was borked by a bad rebase. ## Checks - [x] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [x] I've included any doc changes needed for https://docs.ray.io/en/master/. - [x] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [x] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [x] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Jay Chia <17691182+jaychia@users.noreply.github.com> Signed-off-by: Richard Liaw <rliaw@berkeley.edu> Co-authored-by: Richard Liaw <rliaw@berkeley.edu>
1 parent 34ae461 commit ff1ecbe

File tree

13 files changed

+208
-5
lines changed

13 files changed

+208
-5
lines changed

doc/source/data/api/input_output.rst

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,16 @@ ClickHouse
233233

234234
read_clickhouse
235235

236+
Daft
237+
----
238+
239+
.. autosummary::
240+
:nosignatures:
241+
:toctree: doc/
242+
243+
from_daft
244+
Dataset.to_daft
245+
236246
Dask
237247
----
238248

doc/source/data/comparisons.rst

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@ How does Ray Data compare to other solutions for offline inference?
1919

2020
For a more detailed performance comparison between Ray Data and Sagemaker Batch Transform, see `Offline Batch Inference: Comparing Ray, Apache Spark, and SageMaker <https://www.anyscale.com/blog/offline-batch-inference-comparing-ray-apache-spark-and-sagemaker>`_.
2121

22-
.. dropdown:: Distributed Data Processing Frameworks: Apache Spark
22+
.. dropdown:: Distributed Data Processing Frameworks: Apache Spark and Daft
2323

24-
Ray Data handles many of the same batch processing workloads as `Apache Spark <https://spark.apache.org/>`_, but with a streaming paradigm that is better suited for GPU workloads for deep learning inference.
24+
Ray Data handles many of the same batch processing workloads as `Apache Spark <https://spark.apache.org/>`_ and `Daft <https://www.getdaft.io>`_, but with a streaming paradigm that is better suited for GPU workloads for deep learning inference.
2525

26-
Ray Data doesn't have a SQL interface and isn't meant as a replacement for generic ETL pipelines like Spark.
26+
However, Ray Data doesn't have a SQL interface and isn't meant as a replacement for generic ETL pipelines like Spark and Daft.
2727

2828
For a more detailed performance comparison between Ray Data and Apache Spark, see `Offline Batch Inference: Comparing Ray, Apache Spark, and SageMaker <https://www.anyscale.com/blog/offline-batch-inference-comparing-ray-apache-spark-and-sagemaker>`_.
2929

doc/source/data/loading-data.rst

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -469,7 +469,7 @@ Ray Data interoperates with libraries like pandas, NumPy, and Arrow.
469469
Loading data from distributed DataFrame libraries
470470
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
471471

472-
Ray Data interoperates with distributed data processing frameworks like
472+
Ray Data interoperates with distributed data processing frameworks like `Daft <https://www.getdaft.io>`_,
473473
:ref:`Dask <dask-on-ray>`, :ref:`Spark <spark-on-ray>`, :ref:`Modin <modin-on-ray>`, and
474474
:ref:`Mars <mars-on-ray>`.
475475

@@ -480,6 +480,30 @@ Ray Data interoperates with distributed data processing frameworks like
480480

481481
.. tab-set::
482482

483+
.. tab-item:: Daft
484+
485+
To create a :class:`~ray.data.dataset.Dataset` from a `Daft DataFrame <https://www.getdaft.io/projects/docs/en/stable/api_docs/dataframe.html>`_, call
486+
:func:`~ray.data.from_daft`. This function executes the Daft dataframe and constructs a ``Dataset`` backed by the resultant arrow data produced
487+
by your Daft query.
488+
489+
.. testcode::
490+
491+
import daft
492+
import ray
493+
494+
ray.init()
495+
496+
df = daft.from_pydict({"int_col": [i for i in range(10000)], "str_col": [str(i) for i in range(10000)]})
497+
ds = ray.data.from_daft(df)
498+
499+
ds.show(3)
500+
501+
.. testoutput::
502+
503+
{'int_col': 0, 'str_col': '0'}
504+
{'int_col': 1, 'str_col': '1'}
505+
{'int_col': 2, 'str_col': '2'}
506+
483507
.. tab-item:: Dask
484508

485509
To create a :class:`~ray.data.dataset.Dataset` from a

doc/source/data/saving-data.rst

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,12 +208,25 @@ on the head node.
208208
Converting Datasets to distributed DataFrames
209209
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
210210

211-
Ray Data interoperates with distributed data processing frameworks like
211+
Ray Data interoperates with distributed data processing frameworks like `Daft <https://www.getdaft.io>`_,
212212
:ref:`Dask <dask-on-ray>`, :ref:`Spark <spark-on-ray>`, :ref:`Modin <modin-on-ray>`, and
213213
:ref:`Mars <mars-on-ray>`.
214214

215215
.. tab-set::
216216

217+
.. tab-item:: Daft
218+
219+
To convert a :class:`~ray.data.dataset.Dataset` to a `Daft Dataframe <https://www.getdaft.io/projects/docs/en/stable/api_docs/dataframe.html>`_, call
220+
:meth:`Dataset.to_daft() <ray.data.Dataset.to_daft>`.
221+
222+
.. testcode::
223+
224+
import ray
225+
226+
ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
227+
228+
df = ds.to_daft()
229+
217230
.. tab-item:: Dask
218231

219232
To convert a :class:`~ray.data.dataset.Dataset` to a
@@ -228,6 +241,35 @@ Ray Data interoperates with distributed data processing frameworks like
228241

229242
df = ds.to_dask()
230243

244+
df
245+
246+
.. testoutput::
247+
248+
╭───────────────────┬──────────────────┬───────────────────┬──────────────────┬────────╮
249+
│ sepal length (cm) ┆ sepal width (cm) ┆ petal length (cm) ┆ petal width (cm) ┆ target │
250+
│ --- ┆ --- ┆ --- ┆ --- ┆ --- │
251+
│ Float64 ┆ Float64 ┆ Float64 ┆ Float64 ┆ Int64 │
252+
╞═══════════════════╪══════════════════╪═══════════════════╪══════════════════╪════════╡
253+
│ 5.1 ┆ 3.5 ┆ 1.4 ┆ 0.2 ┆ 0 │
254+
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
255+
│ 4.9 ┆ 3 ┆ 1.4 ┆ 0.2 ┆ 0 │
256+
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
257+
│ 4.7 ┆ 3.2 ┆ 1.3 ┆ 0.2 ┆ 0 │
258+
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
259+
│ 4.6 ┆ 3.1 ┆ 1.5 ┆ 0.2 ┆ 0 │
260+
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
261+
│ 5 ┆ 3.6 ┆ 1.4 ┆ 0.2 ┆ 0 │
262+
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
263+
│ 5.4 ┆ 3.9 ┆ 1.7 ┆ 0.4 ┆ 0 │
264+
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
265+
│ 4.6 ┆ 3.4 ┆ 1.4 ┆ 0.3 ┆ 0 │
266+
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
267+
│ 5 ┆ 3.4 ┆ 1.5 ┆ 0.2 ┆ 0 │
268+
╰───────────────────┴──────────────────┴───────────────────┴──────────────────┴────────╯
269+
270+
(Showing first 8 of 150 rows)
271+
272+
231273
.. tab-item:: Spark
232274

233275
To convert a :class:`~ray.data.dataset.Dataset` to a `Spark DataFrame

doc/source/images/daft.webp

14.7 KB
Binary file not shown.

doc/source/ray-more-libs/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ Ray has a variety of additional integrations with ecosystem libraries.
2727
- :ref:`spark-on-ray`
2828
- :ref:`mars-on-ray`
2929
- :ref:`modin-on-ray`
30+
- `daft <https://www.getdaft.io>`_
3031

3132
.. _air-ecosystem-map:
3233

doc/source/ray-overview/ray-libraries.rst

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,27 @@ more information.
8181
Classy Vision Integration
8282

8383

84+
.. grid-item-card::
85+
86+
.. figure:: ../images/daft.webp
87+
:class: card-figure
88+
89+
.. div::
90+
91+
.. image:: https://img.shields.io/github/stars/daft/daft?style=social)]
92+
:target: https://github.com/Eventual-Inc/Daft
93+
94+
Daft is a data engine that supports SQL and Python DataFrames for data processing and analytics natively on your Ray clusters.
95+
96+
+++
97+
.. button-link:: https://www.getdaft.io
98+
:color: primary
99+
:outline:
100+
:expand:
101+
102+
Daft Integration
103+
104+
84105
.. grid-item-card::
85106

86107
.. figure:: ../images/dask.png

python/ray/data/BUILD

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1265,3 +1265,17 @@ py_test(
12651265
"//:ray_lib",
12661266
],
12671267
)
1268+
1269+
py_test(
1270+
name = "test_daft",
1271+
size = "small",
1272+
srcs = ["tests/test_daft.py"],
1273+
tags = [
1274+
"exclusive",
1275+
"team:data",
1276+
],
1277+
deps = [
1278+
":conftest",
1279+
"//:ray_lib",
1280+
],
1281+
)

python/ray/data/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
from_arrow,
3030
from_arrow_refs,
3131
from_blocks,
32+
from_daft,
3233
from_dask,
3334
from_huggingface,
3435
from_items,
@@ -124,6 +125,7 @@
124125
"ReadTask",
125126
"RowBasedFileDatasink",
126127
"Schema",
128+
"from_daft",
127129
"from_dask",
128130
"from_items",
129131
"from_arrow",

python/ray/data/dataset.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@
113113
from ray.widgets.util import repr_with_fallback
114114

115115
if TYPE_CHECKING:
116+
import daft
116117
import dask
117118
import mars
118119
import modin
@@ -4775,6 +4776,24 @@ def to_tf(
47754776
additional_type_spec=additional_type_spec,
47764777
)
47774778

4779+
@ConsumptionAPI(pattern="Time complexity:")
4780+
@PublicAPI(api_group=IOC_API_GROUP)
4781+
def to_daft(self) -> "daft.DataFrame":
4782+
"""Convert this :class:`~ray.data.Dataset` into a
4783+
`Daft DataFrame <https://www.getdaft.io/projects/docs/en/stable/api_docs/dataframe.html>`_.
4784+
4785+
This will convert all the data inside the Ray Dataset into a Daft DataFrame in a zero-copy way
4786+
(using Arrow as the intermediate data format).
4787+
4788+
Time complexity: O(dataset size / parallelism)
4789+
4790+
Returns:
4791+
A `Daft DataFrame`_ created from this dataset.
4792+
"""
4793+
import daft
4794+
4795+
return daft.from_ray_dataset(self)
4796+
47784797
@ConsumptionAPI(pattern="Time complexity:")
47794798
@PublicAPI(api_group=IOC_API_GROUP)
47804799
def to_dask(

0 commit comments

Comments
 (0)