Skip to content

Commit 0ef266d

Browse files
committed
Fix case
1 parent ecdfe8c commit 0ef266d

File tree

8 files changed

+76
-34
lines changed

8 files changed

+76
-34
lines changed

mars/dataframe/base/bloom_filter.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ def _build_dataframe_filter(cls, in_data: pd.DataFrame, op: "DataFrameBloomFilte
175175
def _convert_to_hashable_dtypes(cls, dtypes: pd.Series):
176176
dtypes = dict(
177177
(name, dtype) if np.issubdtype(dtype, int) else (name, str)
178-
for name, dtype in dtypes.iteritems()
178+
for name, dtype in dtypes.items()
179179
)
180180
return dtypes
181181

mars/dataframe/reduction/aggregation.py

+29-12
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,15 @@ def _calc_result_shape(self, df):
139139
self.output_types = [OutputType.scalar]
140140
return np.array(result_df).dtype, None, 0
141141

142+
def _get_reduced_dim_unit(self, in_ndim, out_ndim):
143+
"""
144+
If rows can be reduced into multiple columns, return nan,
145+
otherwise returns 1
146+
"""
147+
if not isinstance(self.raw_func, str) and isinstance(self.raw_func, Iterable):
148+
return 1
149+
return 1 if in_ndim != out_ndim else np.nan
150+
142151
def __call__(self, df, output_type=None, dtypes=None, index=None):
143152
self._output_types = df.op.output_types
144153
normalize_reduction_funcs(self, ndim=df.ndim)
@@ -154,9 +163,7 @@ def __call__(self, df, output_type=None, dtypes=None, index=None):
154163
else:
155164
out_ndim = 0
156165

157-
reduced_len = (
158-
1 if df.ndim != out_ndim or isinstance(self.raw_func, list) else np.nan
159-
)
166+
reduced_len = self._get_reduced_dim_unit(df.ndim, out_ndim)
160167
if self.output_types[0] == OutputType.dataframe:
161168
if self.axis == 0:
162169
new_shape = (len(index) * reduced_len, len(dtypes))
@@ -213,7 +220,7 @@ def _safe_append(d, key, val):
213220
@classmethod
214221
def _gen_map_chunks(
215222
cls,
216-
op,
223+
op: "DataFrameAggregate",
217224
in_df,
218225
out_df,
219226
func_infos: List[ReductionSteps],
@@ -232,9 +239,7 @@ def _gen_map_chunks(
232239

233240
agg_chunks = np.empty(agg_chunks_shape, dtype=object)
234241
dtypes_cache = dict()
235-
reduced_len = (
236-
1 if in_df.ndim != out_df.ndim or isinstance(op.raw_func, list) else np.nan
237-
)
242+
reduced_len = op._get_reduced_dim_unit(in_df.ndim, out_df.ndim)
238243
for chunk in in_df.chunks:
239244
input_index = chunk.index[1 - axis] if len(chunk.index) > 1 else 0
240245
if input_index not in input_index_to_output:
@@ -438,9 +443,7 @@ def _tile_tree(cls, op: "DataFrameAggregate"):
438443
chunks = cls._gen_map_chunks(
439444
op, in_df, out_df, axis_func_infos, input_index_to_output
440445
)
441-
reduced_len = (
442-
1 if in_df.ndim != out_df.ndim or isinstance(op.raw_func, list) else np.nan
443-
)
446+
reduced_len = op._get_reduced_dim_unit(in_df.ndim, out_df.ndim)
444447
while chunks.shape[axis] > combine_size:
445448
if axis == 0:
446449
new_chunks_shape = (
@@ -715,6 +718,8 @@ def _do_predefined_agg(cls, op: "DataFrameAggregate", input_obj, func_name, kwds
715718
raise NotImplementedError("numeric_only not implemented under cudf")
716719
if isinstance(input_obj, pd.Index):
717720
kwds.pop("skipna", None)
721+
if getattr(input_obj, "ndim", 0) > 1:
722+
kwds["axis"] = op.axis
718723
return getattr(input_obj, func_name)(**kwds)
719724

720725
@classmethod
@@ -865,7 +870,19 @@ def _execute_agg(cls, ctx, op: "DataFrameAggregate"):
865870
ser_index = None
866871
if agg_series_ndim < out.ndim:
867872
ser_index = [func_name]
868-
aggs.append(cls._wrap_df(op, agg_series, index=ser_index))
873+
if (
874+
isinstance(agg_series, np.ndarray)
875+
and getattr(func_inputs[0], "ndim", 0) >= 1
876+
and hasattr(func_inputs[0], "index")
877+
):
878+
agg_series = cls._wrap_df(op, agg_series, index=ser_index)
879+
if op.axis == 0:
880+
agg_series.columns = func_inputs[0].index
881+
else:
882+
agg_series.index = func_inputs[0].index
883+
else:
884+
agg_series = cls._wrap_df(op, agg_series, index=ser_index)
885+
aggs.append(agg_series)
869886

870887
# concatenate to produce final result
871888
concat_df = xdf.concat(aggs, axis=axis)
@@ -931,7 +948,7 @@ def execute(cls, ctx, op: "DataFrameAggregate"):
931948
):
932949
result = op.func[0](in_data)
933950
else:
934-
result = in_data.agg(op.raw_func, axis=op.axis)
951+
result = in_data.agg(op.raw_func, axis=op.axis, **op.raw_func_kw)
935952
if op.outputs[0].ndim == 1:
936953
result = result.astype(op.outputs[0].dtype, copy=False)
937954

mars/dataframe/reduction/mode.py

+9-5
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,20 @@ def __init__(
4141
self._numeric_only = numeric_only
4242
self._dropna = dropna
4343

44+
@staticmethod
45+
def _explode_dict_series(s: pd.Series) -> pd.DataFrame:
46+
exploded = s.apply(pd.Series)
47+
# if exploded.columns.hasnans:
48+
return exploded
49+
4450
def pre(self, in_data): # noqa: W0221 # pylint: disable=arguments-differ
4551
xdf = cudf if self.is_gpu() else pd
4652
if isinstance(in_data, xdf.Series):
4753
return in_data.value_counts(dropna=self._dropna)
4854
else:
4955
if self._axis == 0:
5056
data = dict()
51-
for d, v in in_data.iteritems():
57+
for d, v in in_data.items():
5258
data[d] = [v.value_counts(dropna=self._dropna).to_dict()]
5359
df = xdf.DataFrame(data)
5460
else:
@@ -64,7 +70,7 @@ def agg(self, in_data): # noqa: W0221 # pylint: disable=arguments-differ
6470
else:
6571
if self._axis == 0:
6672
data = dict()
67-
for d, v in in_data.iteritems():
73+
for d, v in in_data.items():
6874
data[d] = [v.apply(pd.Series).sum().to_dict()]
6975
df = xdf.DataFrame(data)
7076
else:
@@ -85,9 +91,7 @@ def _handle_series(s):
8591
if isinstance(in_data, xdf.Series):
8692
return _handle_series(in_data)
8793
else:
88-
in_data_iter = (
89-
in_data.iteritems() if self._axis == 0 else in_data.iterrows()
90-
)
94+
in_data_iter = in_data.items() if self._axis == 0 else in_data.iterrows()
9195
s_list = []
9296
for d, v in in_data_iter:
9397
if isinstance(v.dtype, ArrowListDtype):

mars/dataframe/reduction/nunique.py

+3-5
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ def pre(self, in_data): # noqa: W0221 # pylint: disable=arguments-differ
5959
else:
6060
if self._axis == 0:
6161
data = dict()
62-
for d, v in in_data.iteritems():
62+
for d, v in in_data.items():
6363
if not self._use_arrow_dtype or xdf is cudf:
6464
data[d] = [v.drop_duplicates().to_list()]
6565
else:
@@ -82,7 +82,7 @@ def agg(self, in_data): # noqa: W0221 # pylint: disable=arguments-differ
8282
else:
8383
if self._axis == 0:
8484
data = dict()
85-
for d, v in in_data.iteritems():
85+
for d, v in in_data.items():
8686
if not self._use_arrow_dtype or xdf is cudf:
8787
data[d] = [v.explode().drop_duplicates().to_list()]
8888
else:
@@ -103,9 +103,7 @@ def post(self, in_data): # noqa: W0221 # pylint: disable=arguments-differ
103103
if isinstance(in_data, xdf.Series):
104104
return in_data.explode().nunique(dropna=self._dropna)
105105
else:
106-
in_data_iter = (
107-
in_data.iteritems() if self._axis == 0 else in_data.iterrows()
108-
)
106+
in_data_iter = in_data.items() if self._axis == 0 else in_data.iterrows()
109107
data = dict()
110108
for d, v in in_data_iter:
111109
if isinstance(v.dtype, ArrowListDtype):

mars/dataframe/reduction/tests/test_reduction_execution.py

+28-7
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
cp = lazy_import("cupy", rename="cp")
3737
_agg_size_as_series = pd_release_version >= (1, 3)
3838
_support_kw_agg = pd_release_version >= (1, 1)
39+
_drop_level_reduction = pd_release_version >= (2, 0)
3940

4041

4142
@pytest.fixture
@@ -119,6 +120,9 @@ def compute(data, **kwargs):
119120
np.testing.assert_equal(r.execute().fetch(), compute(data))
120121

121122

123+
@pytest.mark.skipif(
124+
_drop_level_reduction, reason="Level reduction not supported for pandas>=2.0"
125+
)
122126
@pytest.mark.parametrize("func_name,func_opts", reduction_functions)
123127
def test_series_level_reduction(setup, func_name, func_opts: FunctionOptions):
124128
def compute(data, **kwargs):
@@ -162,6 +166,9 @@ def compute(data, **kwargs):
162166
)
163167

164168

169+
@pytest.mark.skipif(
170+
_drop_level_reduction, reason="Level reduction not supported for pandas>=2.0"
171+
)
165172
@pytest.mark.parametrize("func_name,func_opts", reduction_functions)
166173
def test_dataframe_reduction(
167174
setup, check_ref_counts, func_name, func_opts: FunctionOptions
@@ -255,6 +262,9 @@ def compute(data, **kwargs):
255262
)
256263

257264

265+
@pytest.mark.skipif(
266+
_drop_level_reduction, reason="Level reduction not supported for pandas>=2.0"
267+
)
258268
@pytest.mark.parametrize("func_name,func_opts", reduction_functions)
259269
def test_dataframe_level_reduction(
260270
setup, check_ref_counts, func_name, func_opts: FunctionOptions
@@ -403,6 +413,9 @@ def compute(data, **kwargs):
403413
assert r.execute().fetch() is True
404414

405415

416+
@pytest.mark.skipif(
417+
_drop_level_reduction, reason="Level reduction not supported for pandas>=2.0"
418+
)
406419
@pytest.mark.parametrize("func_name", bool_reduction_functions)
407420
def test_series_bool_level_reduction(setup, check_ref_counts, func_name):
408421
def compute(data, **kwargs):
@@ -510,6 +523,9 @@ def compute(data, **kwargs):
510523
)
511524

512525

526+
@pytest.mark.skipif(
527+
_drop_level_reduction, reason="Level reduction not supported for pandas>=2.0"
528+
)
513529
@pytest.mark.parametrize("func_name", bool_reduction_functions)
514530
def test_dataframe_bool_level_reduction(setup, check_ref_counts, func_name):
515531
def compute(data, **kwargs):
@@ -685,18 +701,20 @@ def test_mode(setup, check_ref_counts):
685701
config_kw = {
686702
"extra_config": {
687703
"check_shape": False,
704+
"check_dtypes": False,
705+
"check_columns_value": False,
688706
"check_index_value": False,
689707
}
690708
}
691709
data1 = pd.Series(np.random.randint(0, 5, size=(20,)))
692710

693711
series = md.Series(data1)
694-
result = series.mode().execute().fetch()
712+
result = series.mode().execute(**config_kw).fetch(**config_kw)
695713
expected = data1.mode()
696714
pd.testing.assert_series_equal(result, expected)
697715

698716
series = md.Series(data1, chunk_size=6)
699-
result = series.mode().execute().fetch()
717+
result = series.mode().execute(**config_kw).fetch(**config_kw)
700718
expected = data1.mode()
701719
pd.testing.assert_series_equal(result, expected)
702720

@@ -705,7 +723,7 @@ def test_mode(setup, check_ref_counts):
705723
data2[[2, 9, 18]] = np.nan
706724

707725
series = md.Series(data2)
708-
result = series.mode().execute().fetch()
726+
result = series.mode().execute(**config_kw).fetch(**config_kw)
709727
expected = data2.mode()
710728
pd.testing.assert_series_equal(result, expected)
711729

@@ -720,7 +738,7 @@ def test_mode(setup, check_ref_counts):
720738
columns=["c" + str(i) for i in range(20)],
721739
)
722740
df = md.DataFrame(data1)
723-
result = df.mode().execute().fetch()
741+
result = df.mode().execute(**config_kw).fetch(**config_kw)
724742
expected = data1.mode()
725743
pd.testing.assert_frame_equal(result, expected)
726744

@@ -730,7 +748,7 @@ def test_mode(setup, check_ref_counts):
730748
pd.testing.assert_frame_equal(result, expected)
731749

732750
df = md.DataFrame(data1)
733-
result = df.mode(axis=1).execute().fetch()
751+
result = df.mode(axis=1).execute(**config_kw).fetch(**config_kw)
734752
expected = data1.mode(axis=1)
735753
pd.testing.assert_frame_equal(result, expected)
736754

@@ -744,7 +762,7 @@ def test_mode(setup, check_ref_counts):
744762
data2.iloc[[2, 9, 18], [2, 9, 18]] = np.nan
745763

746764
df = md.DataFrame(data2)
747-
result = df.mode().execute().fetch()
765+
result = df.mode().execute(**config_kw).fetch(**config_kw)
748766
expected = data2.mode()
749767
pd.testing.assert_frame_equal(result, expected)
750768

@@ -1008,7 +1026,10 @@ def test_dataframe_aggregate(setup, check_ref_counts):
10081026
mean_9=NamedAgg(9, "mean"),
10091027
)
10101028
result = df.agg(**agg_kw)
1011-
pd.testing.assert_frame_equal(result.execute().fetch(), data.agg(**agg_kw))
1029+
pd.testing.assert_frame_equal(
1030+
result.execute().fetch(extra_config={"check_shape": False}),
1031+
data.agg(**agg_kw),
1032+
)
10121033

10131034

10141035
def test_series_aggregate(setup, check_ref_counts):

mars/deploy/oscar/session.py

+2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414

1515
import asyncio
16+
import copy
1617
import itertools
1718
import logging
1819
import json
@@ -355,6 +356,7 @@ async def _run_in_background(
355356
async def execute(self, *tileables, **kwargs) -> ExecutionInfo:
356357
if self._closed:
357358
raise RuntimeError("Session closed already")
359+
kwargs = copy.deepcopy(kwargs)
358360
fuse_enabled: bool = kwargs.pop("fuse_enabled", None)
359361
extra_config: dict = kwargs.pop("extra_config", None)
360362
warn_duplicated_execution: bool = kwargs.pop("warn_duplicated_execution", False)

mars/deploy/oscar/tests/session.py

+2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414

1515
import asyncio
16+
import copy
1617
import inspect
1718
import os
1819
import uuid
@@ -62,6 +63,7 @@ def _process_result(self, tileable, result):
6263
return super()._process_result(tileable, result)
6364

6465
async def fetch(self, *tileables, **kwargs):
66+
kwargs = copy.deepcopy(kwargs)
6567
extra_config = kwargs.pop("extra_config", dict())
6668
if kwargs:
6769
unexpected_keys = ", ".join(list(kwargs.keys()))

mars/tests/test_session.py

+2-4
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ def test_iter(setup):
301301
raw_data = pd.DataFrame(np.random.randint(1000, size=(20, 10)))
302302
df = md.DataFrame(raw_data, chunk_size=5)
303303

304-
for col, series in df.iteritems():
304+
for col, series in df.items():
305305
pd.testing.assert_series_equal(series.execute().fetch(), raw_data[col])
306306

307307
for i, batch in enumerate(df.iterbatch(batch_size=15)):
@@ -331,9 +331,7 @@ def test_iter(setup):
331331
pd.testing.assert_series_equal(batch, raw_data.iloc[i * 15 : (i + 1) * 15])
332332

333333
i = 0
334-
for result_item, expect_item in zip(
335-
s.iteritems(batch_size=15), raw_data.iteritems()
336-
):
334+
for result_item, expect_item in zip(s.items(batch_size=15), raw_data.items()):
337335
assert result_item[0] == expect_item[0]
338336
assert result_item[1] == expect_item[1]
339337
i += 1

0 commit comments

Comments
 (0)