Skip to content

Commit 1aa6936

Browse files
authored
Add n_reducers and reducer_ordinal to shuffle operands (#3055)
1 parent b685973 commit 1aa6936

File tree

22 files changed

+147
-38
lines changed

22 files changed

+147
-38
lines changed

mars/core/operand/shuffle.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ class MapReduceOperand(Operand):
3131
mapper_id = Int32Field("mapper_id", default=0)
3232
# for reducer
3333
reducer_index = TupleField("reducer_index", FieldTypes.uint64)
34+
# Total reducer nums, which also be shuffle blocks for single mapper.
35+
n_reducers = Int32Field("n_reducers")
36+
# The reducer ordinal in all reducers. It's different from reducer_index,
37+
# which might be a tuple.
38+
reducer_ordinal = Int32Field("reducer_ordinal")
3439
reducer_phase = StringField("reducer_phase", default=None)
3540

3641
def _new_chunks(self, inputs, kws=None, **kw):

mars/core/operand/tests/test_core.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,11 @@
1313
# limitations under the License.
1414

1515
import numpy as np
16+
import pandas as pd
1617
import pytest
1718

1819
from ... import OutputType
19-
from .. import Operand, TileableOperandMixin, execute, estimate_size
20+
from .. import Operand, TileableOperandMixin, execute, estimate_size, ShuffleProxy
2021

2122

2223
class MyOperand(Operand, TileableOperandMixin):
@@ -130,3 +131,20 @@ def execute_normally(ctx, op):
130131
assert (
131132
t2.execute(extra_config={"operand_executors": operand_executors}).fetch() == 2
132133
)
134+
135+
136+
def test_shuffle(setup):
137+
from ....dataframe import DataFrame
138+
139+
chunk_size, n_rows = 10, 100
140+
df = DataFrame(
141+
pd.DataFrame(np.random.rand(n_rows, 3), columns=list("abc")),
142+
chunk_size=chunk_size,
143+
)
144+
chunk_graph = df.groupby(["a"]).apply(lambda x: x).build_graph(tile=True)
145+
[proxy_chunk] = [c for c in chunk_graph if isinstance(c.op, ShuffleProxy)]
146+
successors = chunk_graph.successors(proxy_chunk)
147+
n_reducer = successors[0].op.n_reducers
148+
assert n_reducer == len(successors), (n_reducer, len(successors))
149+
assert len(set(c.op.n_reducers for c in successors)) == 1
150+
assert sorted([c.op.reducer_ordinal for c in successors]) == list(range(n_reducer))

mars/dataframe/align.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -683,6 +683,8 @@ def _gen_series_chunks(splits, out_shape, left_or_right, series):
683683
for out_idx in range(out_shape[0]):
684684
reduce_op = DataFrameIndexAlign(
685685
stage=OperandStage.reduce,
686+
n_reducers=out_shape[0],
687+
reducer_ordinal=out_idx,
686688
i=out_idx,
687689
sparse=proxy_chunk.issparse(),
688690
output_types=[OutputType.series],
@@ -820,6 +822,8 @@ def _gen_dataframe_chunks(splits, out_shape, left_or_right, df):
820822
)
821823
reduce_op = DataFrameIndexAlign(
822824
stage=OperandStage.reduce,
825+
n_reducers=out_shape[shuffle_axis],
826+
reducer_ordinal=j,
823827
i=j,
824828
sparse=proxy_chunk.issparse(),
825829
output_types=[OutputType.dataframe],
@@ -853,9 +857,12 @@ def _gen_dataframe_chunks(splits, out_shape, left_or_right, df):
853857
).new_chunk(map_chunks, shape=())
854858

855859
# gen reduce chunks
856-
for out_idx in itertools.product(*(range(s) for s in out_shape)):
860+
out_indices = list(itertools.product(*(range(s) for s in out_shape)))
861+
for ordinal, out_idx in enumerate(out_indices):
857862
reduce_op = DataFrameIndexAlign(
858863
stage=OperandStage.reduce,
864+
n_reducers=len(out_indices),
865+
reducer_ordinal=ordinal,
859866
i=out_idx,
860867
sparse=proxy_chunk.issparse(),
861868
output_types=[OutputType.dataframe],

mars/dataframe/base/_duplicate.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,8 @@ def _tile_shuffle(cls, op: "DuplicateOperand", inp):
232232
reduce_op._method = "shuffle"
233233
reduce_op.stage = OperandStage.reduce
234234
reduce_op.reducer_phase = "drop_duplicates"
235+
reduce_op.n_reducers = len(map_chunks)
236+
reduce_op.reducer_ordinal = i
235237
reduce_op._shuffle_size = inp.chunk_shape[0]
236238
reduce_op._output_types = op.output_types
237239
reduce_chunk_params = map_chunks[0].params
@@ -250,6 +252,8 @@ def _tile_shuffle(cls, op: "DuplicateOperand", inp):
250252
put_back_op.stage = OperandStage.reduce
251253
put_back_op.reducer_phase = "put_back"
252254
put_back_op.reducer_index = (i,)
255+
put_back_op.n_reducers = len(map_chunks)
256+
put_back_op.reducer_ordinal = i
253257
if out.ndim == 2:
254258
put_back_chunk_params = map_chunks[i].params
255259
else:

mars/dataframe/groupby/aggregation.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,8 @@ def partition_merge_data(
319319
partition_shuffle_reduce = DataFrameGroupbySortShuffle(
320320
stage=OperandStage.reduce,
321321
reducer_index=(i, 0),
322+
n_reducers=len(partition_chunks),
323+
reducer_ordinal=i,
322324
output_types=output_types,
323325
**properties,
324326
)
@@ -436,9 +438,13 @@ def _gen_shuffle_chunks(cls, op, chunks):
436438

437439
# generate reduce chunks
438440
reduce_chunks = []
439-
for out_idx in itertools.product(*(range(s) for s in chunk_shape)):
441+
out_indices = list(itertools.product(*(range(s) for s in chunk_shape)))
442+
for ordinal, out_idx in enumerate(out_indices):
440443
reduce_op = DataFrameGroupByOperand(
441-
stage=OperandStage.reduce, output_types=[OutputType.dataframe_groupby]
444+
stage=OperandStage.reduce,
445+
output_types=[OutputType.dataframe_groupby],
446+
reducer_ordinal=ordinal,
447+
n_reducers=len(out_indices),
442448
)
443449
reduce_chunks.append(
444450
reduce_op.new_chunk(

mars/dataframe/groupby/core.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -294,11 +294,14 @@ def tile(cls, op):
294294

295295
# generate reduce chunks
296296
reduce_chunks = []
297-
for out_idx in itertools.product(*(range(s) for s in chunk_shape)):
297+
out_indices = list(itertools.product(*(range(s) for s in chunk_shape)))
298+
for ordinal, out_idx in enumerate(out_indices):
298299
reduce_op = op.copy().reset_key()
299300
reduce_op._by = None
300301
reduce_op._output_types = [output_type]
301302
reduce_op.stage = OperandStage.reduce
303+
reduce_op.reducer_ordinal = ordinal
304+
reduce_op.n_reducers = len(out_indices)
302305
reduce_chunks.append(
303306
reduce_op.new_chunk(
304307
[proxy_chunk], shape=(np.nan, np.nan), index=out_idx

mars/dataframe/groupby/sample.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -496,12 +496,14 @@ def _tile_distributed(cls, op: "GroupBySample", in_df, weights):
496496
)
497497

498498
reduce_chunks = []
499-
for src_chunk in in_df.chunks:
499+
for ordinal, src_chunk in enumerate(in_df.chunks):
500500
new_op = op.copy().reset_key()
501501
new_op._weights = None
502502
new_op._output_types = [OutputType.tensor]
503503
new_op.stage = OperandStage.reduce
504504
new_op.reducer_index = (src_chunk.index[0],)
505+
new_op.reducer_ordinal = ordinal
506+
new_op.n_reducers = len(in_df.chunks)
505507
new_op._input_nsplits = np.array(in_df.nsplits[0])
506508

507509
reduce_chunks.append(

mars/dataframe/merge/merge.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -259,9 +259,12 @@ def _gen_shuffle_chunks(
259259

260260
# gen reduce chunks
261261
reduce_chunks = []
262-
for out_idx in itertools.product(*(range(s) for s in out_shape)):
262+
out_indices = list(itertools.product(*(range(s) for s in out_shape)))
263+
for ordinal, out_idx in enumerate(out_indices):
263264
reduce_op = DataFrameMergeAlign(
264265
stage=OperandStage.reduce,
266+
reducer_ordinal=ordinal,
267+
n_reducers=len(out_indices),
265268
sparse=proxy_chunk.issparse(),
266269
output_types=[OutputType.dataframe],
267270
)
@@ -312,9 +315,13 @@ def _gen_both_shuffle_chunks(
312315
# gen reduce chunks
313316
left_reduce_chunks = []
314317
right_reduce_chunks = []
315-
for out_idx in itertools.product(*(range(s) for s in out_shape)):
318+
out_indices = list(itertools.product(*(range(s) for s in out_shape)))
319+
for ordinal, out_idx in enumerate(out_indices):
316320
reduce_op = DataFrameMergeAlign(
317-
stage=OperandStage.reduce, sparse=proxy_chunk.issparse()
321+
stage=OperandStage.reduce,
322+
sparse=proxy_chunk.issparse(),
323+
reducer_ordinal=ordinal,
324+
n_reducers=len(out_indices),
318325
)
319326
left_param = {
320327
"shape": (np.nan, np.nan),

mars/dataframe/sort/psrs.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,8 @@ def partition_merge_data(
238238
stage=OperandStage.reduce,
239239
kind=kind,
240240
reducer_index=(i,),
241+
reducer_ordinal=i,
242+
n_reducers=len(partition_chunks),
241243
output_types=op.output_types,
242244
**cls._collect_op_properties(op)
243245
)

mars/learn/ensemble/_bagging.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,6 @@ class BaggingSample(LearnShuffle, LearnOperandMixin):
133133
feature_random_state = RandomStateField("feature_random_state")
134134

135135
reducer_ratio: float = Float32Field("reducer_ratio")
136-
n_reducers: int = Int64Field("n_reducers", default=None)
137136
column_offset: int = Int64Field("column_offset", default=None)
138137

139138
chunk_shape: Tuple[int] = TupleField("chunk_shape", FieldTypes.int64)
@@ -295,7 +294,7 @@ def tile(cls, op: "BaggingSample"):
295294

296295
n_reducers = (
297296
op.n_reducers
298-
if op.n_reducers is not None
297+
if getattr(op, "n_reducers", None)
299298
else max(1, int(in_sample.chunk_shape[0] * op.reducer_ratio))
300299
)
301300

@@ -357,6 +356,8 @@ def tile(cls, op: "BaggingSample"):
357356
new_op = op.copy().reset_key()
358357
new_op.random_state = None
359358
new_op.stage = OperandStage.reduce
359+
new_op.reducer_ordinal = idx
360+
new_op.n_reducers = n_reducers
360361
new_op.chunk_shape = in_sample.chunk_shape
361362
new_op.n_estimators = op.n_estimators // n_reducers
362363
if remain_reducers:

0 commit comments

Comments
 (0)