Skip to content

Commit 0c2f34b

Browse files
committed
Add support for mode function
1 parent 0a42ba8 commit 0c2f34b

File tree

10 files changed

+490
-72
lines changed

10 files changed

+490
-72
lines changed

docs/source/reference/dataframe/frame.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ Computations / descriptive stats
127127
DataFrame.max
128128
DataFrame.mean
129129
DataFrame.min
130+
DataFrame.mode
130131
DataFrame.nunique
131132
DataFrame.pct_change
132133
DataFrame.prod

docs/source/reference/dataframe/series.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ Computations / descriptive stats
125125
Series.mean
126126
Series.median
127127
Series.min
128+
Series.mode
128129
Series.pct_change
129130
Series.prod
130131
Series.product

mars/dataframe/reduction/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ def _install():
6262
from .skew import skew_dataframe, skew_series
6363
from .kurtosis import kurt_dataframe, kurt_series
6464
from .reduction_size import size_dataframe, size_series
65+
from .mode import mode_dataframe, mode_series
6566

6667
funcs = [
6768
("sum", sum_series, sum_dataframe),
@@ -88,6 +89,7 @@ def _install():
8889
("kurtosis", kurt_series, kurt_dataframe),
8990
("unique", unique, None),
9091
("_reduction_size", size_dataframe, size_series),
92+
("mode", mode_series, mode_dataframe),
9193
]
9294
for func_name, series_func, df_func in funcs:
9395
if df_func is not None: # pragma: no branch

mars/dataframe/reduction/aggregation.py

Lines changed: 77 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ def where_function(cond, var1, var2):
7979
"kurt": lambda x, skipna=True, bias=False: x.kurt(skipna=skipna, bias=bias),
8080
"kurtosis": lambda x, skipna=True, bias=False: x.kurtosis(skipna=skipna, bias=bias),
8181
"nunique": lambda x: x.nunique(),
82+
"mode": lambda x, skipna=True: x.mode(skipna=skipna),
8283
}
8384

8485

@@ -126,53 +127,76 @@ def _calc_result_shape(self, df):
126127

127128
if isinstance(result_df, pd.DataFrame):
128129
self.output_types = [OutputType.dataframe]
129-
return result_df.dtypes, result_df.index
130+
return result_df.dtypes, result_df.index, 2
130131
elif isinstance(result_df, pd.Series):
131132
self.output_types = [OutputType.series]
132-
return pd.Series([result_df.dtype], index=[result_df.name]), result_df.index
133+
return (
134+
pd.Series([result_df.dtype], index=[result_df.name]),
135+
result_df.index,
136+
1,
137+
)
133138
else:
134139
self.output_types = [OutputType.scalar]
135-
return np.array(result_df).dtype, None
140+
return np.array(result_df).dtype, None, 0
136141

137142
def __call__(self, df, output_type=None, dtypes=None, index=None):
138143
self._output_types = df.op.output_types
139144
normalize_reduction_funcs(self, ndim=df.ndim)
140145
if output_type is None or dtypes is None:
141146
with enter_mode(kernel=False, build=False):
142-
dtypes, index = self._calc_result_shape(df)
147+
dtypes, index, out_ndim = self._calc_result_shape(df)
143148
else:
144149
self.output_types = [output_type]
150+
if output_type == OutputType.dataframe:
151+
out_ndim = 2
152+
elif output_type == OutputType.series:
153+
out_ndim = 1
154+
else:
155+
out_ndim = 0
145156

157+
reduced_len = (
158+
1 if df.ndim != out_ndim or isinstance(self.raw_func, list) else np.nan
159+
)
146160
if self.output_types[0] == OutputType.dataframe:
147161
if self.axis == 0:
148-
new_shape = (len(index), len(dtypes))
149-
new_index = parse_index(index, store_data=True)
162+
new_shape = (len(index) * reduced_len, len(dtypes))
163+
new_index_value = parse_index(index, store_data=True)
164+
new_dtypes = dtypes
165+
new_col_name = parse_index(dtypes.index, store_data=True)
150166
else:
151-
new_shape = (df.shape[0], len(dtypes))
152-
new_index = df.index_value
167+
new_shape = (df.shape[0], len(dtypes) * reduced_len)
168+
new_index_value = df.index_value
169+
new_dtypes = None if np.isnan(reduced_len) else dtypes
170+
new_col_name = parse_index(
171+
dtypes.index, store_data=not np.isnan(reduced_len)
172+
)
153173
return self.new_dataframe(
154174
[df],
155175
shape=new_shape,
156-
dtypes=dtypes,
157-
index_value=new_index,
158-
columns_value=parse_index(dtypes.index, store_data=True),
176+
dtypes=new_dtypes,
177+
index_value=new_index_value,
178+
columns_value=new_col_name,
159179
)
160180
elif self.output_types[0] == OutputType.series:
161181
if df.ndim == 1:
162-
new_shape = (len(index),)
163-
new_index = parse_index(index, store_data=True)
182+
new_shape = (len(index) * reduced_len,)
183+
new_index_value = parse_index(
184+
index, store_data=not np.isnan(reduced_len)
185+
)
164186
elif self.axis == 0:
165-
new_shape = (len(index),)
166-
new_index = parse_index(index, store_data=True)
187+
new_shape = (len(index) * reduced_len,)
188+
new_index_value = parse_index(
189+
index, store_data=not np.isnan(reduced_len)
190+
)
167191
else:
168192
new_shape = (df.shape[0],)
169-
new_index = df.index_value
193+
new_index_value = df.index_value
170194
return self.new_series(
171195
[df],
172196
shape=new_shape,
173197
dtype=dtypes[0],
174198
name=dtypes.index[0],
175-
index_value=new_index,
199+
index_value=new_index_value,
176200
)
177201
elif self.output_types[0] == OutputType.tensor:
178202
return self.new_tileable([df], dtype=dtypes, shape=(np.nan,))
@@ -208,6 +232,9 @@ def _gen_map_chunks(
208232

209233
agg_chunks = np.empty(agg_chunks_shape, dtype=object)
210234
dtypes_cache = dict()
235+
reduced_len = (
236+
1 if in_df.ndim != out_df.ndim or isinstance(op.raw_func, list) else np.nan
237+
)
211238
for chunk in in_df.chunks:
212239
input_index = chunk.index[1 - axis] if len(chunk.index) > 1 else 0
213240
if input_index not in input_index_to_output:
@@ -234,9 +261,9 @@ def _gen_map_chunks(
234261

235262
if map_op.output_types[0] == OutputType.dataframe:
236263
if axis == 0:
237-
shape = (1, out_df.shape[-1])
264+
shape = (reduced_len, chunk.shape[-1])
238265
if out_df.ndim == 2:
239-
columns_value = out_df.columns_value
266+
columns_value = chunk.columns_value
240267
index_value = out_df.index_value
241268
else:
242269
columns_value = out_df.index_value
@@ -259,11 +286,11 @@ def _gen_map_chunks(
259286
index_value=index_value,
260287
)
261288
else:
262-
shape = (out_df.shape[0], 1)
289+
shape = (chunk.shape[0], reduced_len)
263290
columns_value = parse_index(
264291
pd.Index([0]), out_df.key, store_data=True
265292
)
266-
index_value = out_df.index_value
293+
index_value = chunk.index_value
267294

268295
agg_chunk = map_op.new_chunk(
269296
[chunk],
@@ -273,7 +300,9 @@ def _gen_map_chunks(
273300
index_value=index_value,
274301
)
275302
else:
276-
agg_chunk = map_op.new_chunk([chunk], shape=(1,), index=new_index)
303+
agg_chunk = map_op.new_chunk(
304+
[chunk], shape=(reduced_len,), index=new_index
305+
)
277306
agg_chunks[agg_chunk.index] = agg_chunk
278307
return agg_chunks
279308

@@ -409,6 +438,9 @@ def _tile_tree(cls, op: "DataFrameAggregate"):
409438
chunks = cls._gen_map_chunks(
410439
op, in_df, out_df, axis_func_infos, input_index_to_output
411440
)
441+
reduced_len = (
442+
1 if in_df.ndim != out_df.ndim or isinstance(op.raw_func, list) else np.nan
443+
)
412444
while chunks.shape[axis] > combine_size:
413445
if axis == 0:
414446
new_chunks_shape = (
@@ -429,16 +461,16 @@ def _tile_tree(cls, op: "DataFrameAggregate"):
429461
chks = chunks[i : i + combine_size, idx1]
430462
chunk_index = (idx0, idx1)
431463
if chks[0].ndim == 1:
432-
concat_shape = (len(chks),)
433-
agg_shape = (1,)
464+
concat_shape = (len(chks) * reduced_len,)
465+
agg_shape = (reduced_len,)
434466
else:
435-
concat_shape = (len(chks), chks[0].shape[1])
436-
agg_shape = (chks[0].shape[1], 1)
467+
concat_shape = (len(chks) * reduced_len, chks[0].shape[1])
468+
agg_shape = (chks[0].shape[1], reduced_len)
437469
else:
438470
chks = chunks[idx1, i : i + combine_size]
439471
chunk_index = (idx1, idx0)
440-
concat_shape = (chks[0].shape[0], len(chks))
441-
agg_shape = (chks[0].shape[0], 1)
472+
concat_shape = (chks[0].shape[0], len(chks) * reduced_len)
473+
agg_shape = (chks[0].shape[0], reduced_len)
442474

443475
chks = chks.reshape((chks.shape[0],)).tolist()
444476
if len(chks) == 1:
@@ -485,12 +517,12 @@ def _tile_tree(cls, op: "DataFrameAggregate"):
485517
if axis == 0:
486518
chks = chunks[:, idx]
487519
if chks[0].ndim == 1:
488-
concat_shape = (len(chks),)
520+
concat_shape = (len(chks) * reduced_len,)
489521
else:
490-
concat_shape = (len(chks), chks[0].shape[1])
522+
concat_shape = (len(chks) * reduced_len, chks[0].shape[1])
491523
else:
492524
chks = chunks[idx, :]
493-
concat_shape = (chks[0].shape[0], len(chks))
525+
concat_shape = (chks[0].shape[0], len(chks) * reduced_len)
494526
chks = chks.reshape((chks.shape[0],)).tolist()
495527
chk = concat_op.new_chunk(
496528
chks,
@@ -519,7 +551,7 @@ def _tile_tree(cls, op: "DataFrameAggregate"):
519551
shape_len = len(col_index)
520552
kw.update(
521553
dict(
522-
shape=(out_df.shape[0], shape_len),
554+
shape=(out_df.shape[0] * reduced_len, shape_len),
523555
columns_value=columns_value,
524556
index=(0, idx),
525557
dtypes=out_df.dtypes[columns_value.to_pandas()],
@@ -531,7 +563,10 @@ def _tile_tree(cls, op: "DataFrameAggregate"):
531563
dict(
532564
index=(idx, 0),
533565
index_value=src_col_chunk.index_value,
534-
shape=(src_col_chunk.shape[0], out_df.shape[1]),
566+
shape=(
567+
src_col_chunk.shape[0],
568+
out_df.shape[1] * reduced_len,
569+
),
535570
dtypes=out_df.dtypes,
536571
)
537572
)
@@ -843,25 +878,26 @@ def _execute_agg(cls, ctx, op: "DataFrameAggregate"):
843878
concat_df = concat_df.iloc[:, 0]
844879
else:
845880
concat_df = concat_df.iloc[:, 0]
846-
concat_df.name = op.outputs[0].name
881+
concat_df.name = out.name
847882

848-
concat_df = concat_df.astype(op.outputs[0].dtype, copy=False)
883+
concat_df = concat_df.astype(out.dtype, copy=False)
849884
elif op.output_types[0] == OutputType.scalar:
850885
concat_df = concat_df.iloc[0]
851886
try:
852-
concat_df = concat_df.astype(op.outputs[0].dtype)
887+
concat_df = concat_df.astype(out.dtype)
853888
except AttributeError:
854889
# concat_df may be a string and has no `astype` method
855890
pass
856891
elif op.output_types[0] == OutputType.tensor:
857892
concat_df = xp.array(concat_df).astype(dtype=out.dtype)
858893
else:
859-
if axis == 0:
860-
concat_df = concat_df.reindex(op.outputs[0].index_value.to_pandas())
861-
else:
862-
concat_df = concat_df[op.outputs[0].columns_value.to_pandas()]
894+
if not np.isnan(out.shape[op.axis]):
895+
if axis == 0:
896+
concat_df = concat_df.reindex(out.index_value.to_pandas())
897+
else:
898+
concat_df = concat_df[out.columns_value.to_pandas()]
863899

864-
concat_df = concat_df.astype(op.outputs[0].dtypes, copy=False)
900+
concat_df = concat_df.astype(out.dtypes, copy=False)
865901
ctx[op.outputs[0].key] = concat_df
866902

867903
@classmethod

0 commit comments

Comments
 (0)