Skip to content

Commit 78628b6

Browse files
[Ray] Fix pandas schema parsing when reading Ray dataset (#2946)
* fix parsing pandas schema and add ut * fix pandas schema import * update ray * Fix test_task_manager * skip test_cut_execution for ray_dag * fix coverage Co-authored-by: zhongchun <zhongchunyu@gmail.com>
1 parent a464573 commit 78628b6

File tree

6 files changed

+60
-8
lines changed

6 files changed

+60
-8
lines changed

.github/workflows/platform-ci.yml

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,12 @@ jobs:
9797
rm -fr /tmp/etcd-$ETCD_VER-linux-amd64.tar.gz /tmp/etcd-download-test
9898
fi
9999
if [ -n "$WITH_RAY" ] || [ -n "$WITH_RAY_DAG" ] || [ -n "$WITH_RAY_DEPLOY" ]; then
100-
pip install ray[default]==1.9.2 "protobuf<4"
101-
pip install "xgboost_ray==0.1.5" "xgboost<1.6.0"
100+
pip install "xgboost_ray==0.1.5" "xgboost<1.6.0" "protobuf<4"
101+
# Use standard ray releases when ownership bug is fixed
102+
pip uninstall -y ray
103+
pip install https://s3-us-west-2.amazonaws.com/ray-wheels/master/c03d0432f3bb40f3c597b7fc450870ba5e34ad56/ray-3.0.0.dev0-cp38-cp38-manylinux2014_x86_64.whl
104+
# Ray Datasets need pyarrow>=6.0.1
105+
pip install "pyarrow>=6.0.1"
102106
fi
103107
if [ -n "$RUN_DASK" ]; then
104108
pip install dask[complete] mimesis sklearn

mars/dataframe/base/tests/test_base_execution.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -775,7 +775,6 @@ def test_isin_execution(setup):
775775
pd.testing.assert_frame_equal(result, expected)
776776

777777

778-
@pytest.mark.ray_dag
779778
def test_cut_execution(setup):
780779
session = setup
781780

mars/dataframe/datasource/read_raydataset.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,10 +112,23 @@ def __call__(self, index_value=None, columns_value=None, dtypes=None):
112112
def read_ray_dataset(ds, columns=None, incremental_index=False, **kwargs):
113113
assert isinstance(ds, real_ray_dataset.Dataset)
114114
refs = ds.to_pandas_refs()
115-
dtypes = ds.schema().empty_table().to_pandas().dtypes
115+
schema = ds.schema()
116+
117+
import pyarrow as pa
118+
119+
try:
120+
from ray.data.impl.pandas_block import PandasBlockSchema
121+
122+
if isinstance(schema, PandasBlockSchema):
123+
dtypes = pd.Series(schema.types, index=schema.names)
124+
elif isinstance(schema, pa.Schema):
125+
dtypes = schema.empty_table().to_pandas().dtypes
126+
else:
127+
raise NotImplementedError(f"Unsupported format of schema {schema}")
128+
except ImportError: # pragma: no cover
129+
dtypes = schema.empty_table().to_pandas().dtypes
116130
index_value = parse_index(pd.RangeIndex(-1))
117131
columns_value = parse_index(dtypes.index, store_data=True)
118-
119132
op = DataFrameReadRayDataset(
120133
refs=refs, columns=columns, incremental_index=incremental_index
121134
)

mars/dataframe/datasource/tests/test_datasource_execution.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1178,6 +1178,36 @@ def test_read_raydataset(ray_start_regular, ray_create_mars_cluster):
11781178
mdf = md.read_ray_dataset(ds)
11791179
assert df.equals(mdf.execute().fetch())
11801180

1181+
n = 10000
1182+
pdf = pd.DataFrame({"a": list(range(n)), "b": list(range(n, 2 * n))})
1183+
df = md.DataFrame(pdf)
1184+
1185+
# Convert mars dataframe to ray dataset
1186+
ds = md.to_ray_dataset(df)
1187+
pd.testing.assert_frame_equal(ds.to_pandas(), df.to_pandas())
1188+
ds2 = ds.filter(lambda row: row["a"] % 2 == 0)
1189+
assert ds2.take(5) == [{"a": 2 * i, "b": n + 2 * i} for i in range(5)]
1190+
1191+
# Convert ray dataset to mars dataframe
1192+
df2 = md.read_ray_dataset(ds2)
1193+
pd.testing.assert_frame_equal(
1194+
df2.head(5).to_pandas(),
1195+
pd.DataFrame({"a": list(range(0, 10, 2)), "b": list(range(n, n + 10, 2))}),
1196+
)
1197+
1198+
# Test Arrow Dataset
1199+
pdf2 = pd.DataFrame({c: range(5) for c in "abc"})
1200+
ds3 = ray.data.from_arrow([pa.Table.from_pandas(pdf2) for _ in range(3)])
1201+
df3 = md.read_ray_dataset(ds3)
1202+
pd.testing.assert_frame_equal(
1203+
df3.head(5).to_pandas(),
1204+
pdf2,
1205+
)
1206+
1207+
# Test simple datasets
1208+
with pytest.raises(NotImplementedError):
1209+
ray.data.range(10).to_mars()
1210+
11811211

11821212
@require_ray
11831213
def test_read_ray_mldataset(ray_start_regular, ray_create_mars_cluster):

mars/services/task/execution/ray/config.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ def get_deploy_band_resources(self) -> List[Dict[str, Resource]]:
4040
return []
4141

4242
def get_subtask_max_retries(self):
43-
return self._ray_execution_config.get("subtask_max_retries")
43+
return self._ray_execution_config["subtask_max_retries"]
4444

4545
def get_n_cpu(self):
4646
return self._ray_execution_config["n_cpu"]
@@ -49,7 +49,7 @@ def get_n_worker(self):
4949
return self._ray_execution_config["n_worker"]
5050

5151
def get_subtask_cancel_timeout(self):
52-
return self._ray_execution_config.get("subtask_cancel_timeout")
52+
return self._ray_execution_config["subtask_cancel_timeout"]
5353

5454
def create_task_state_actor_as_needed(self):
5555
# Whether create RayTaskState actor as needed.

mars/services/task/supervisor/tests/test_task_manager.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,13 @@ async def actor_pool():
7878
await MockMutableAPI.create(session_id, pool.external_address)
7979

8080
# create configuration
81-
config = ExecutionConfig.from_params(backend=backend, n_worker=1, n_cpu=2)
81+
config = ExecutionConfig.from_params(
82+
backend=backend,
83+
n_worker=1,
84+
n_cpu=2,
85+
subtask_max_retries=3,
86+
subtask_cancel_timeout=3,
87+
)
8288
await mo.create_actor(
8389
TaskConfigurationActor,
8490
dict(),

0 commit comments

Comments
 (0)